diff --git a/extractor.go b/extractor.go deleted file mode 100644 index fe6a1da..0000000 --- a/extractor.go +++ /dev/null @@ -1,161 +0,0 @@ -package tcp - -import ( - "fmt" - "reflect" - "strconv" - "strings" - - "github.com/unistack-org/micro/v3/registry" - "github.com/unistack-org/micro/v3/server" - "github.com/unistack-org/micro/v3/util/addr" -) - -func serviceDef(opts server.Options) *registry.Service { - var advt, host string - var port int - - if len(opts.Advertise) > 0 { - advt = opts.Advertise - } else { - advt = opts.Address - } - - parts := strings.Split(advt, ":") - if len(parts) > 1 { - host = strings.Join(parts[:len(parts)-1], ":") - port, _ = strconv.Atoi(parts[len(parts)-1]) - } else { - host = parts[0] - } - - addr, err := addr.Extract(host) - if err != nil { - addr = host - } - - node := ®istry.Node{ - Id: opts.Name + "-" + opts.Id, - Address: fmt.Sprintf("%s:%d", addr, port), - Metadata: opts.Metadata, - } - - node.Metadata["server"] = "tcp" - node.Metadata["broker"] = opts.Broker.String() - node.Metadata["registry"] = opts.Registry.String() - node.Metadata["protocol"] = "tcp" - - return ®istry.Service{ - Name: opts.Name, - Version: opts.Version, - Nodes: []*registry.Node{node}, - } -} - -func extractValue(v reflect.Type, d int) *registry.Value { - if d == 3 { - return nil - } - if v == nil { - return nil - } - - if v.Kind() == reflect.Ptr { - v = v.Elem() - } - - arg := ®istry.Value{ - Name: v.Name(), - Type: v.Name(), - } - - switch v.Kind() { - case reflect.Struct: - for i := 0; i < v.NumField(); i++ { - f := v.Field(i) - val := extractValue(f.Type, d+1) - if val == nil { - continue - } - - // if we can find a json tag use it - if tags := f.Tag.Get("json"); len(tags) > 0 { - parts := strings.Split(tags, ",") - val.Name = parts[0] - } - - // if there's no name default it - if len(val.Name) == 0 { - val.Name = v.Field(i).Name - } - - arg.Values = append(arg.Values, val) - } - case reflect.Slice: - p := v.Elem() - if p.Kind() == reflect.Ptr { - p = p.Elem() - } - arg.Type = "[]" + p.Name() - val := extractValue(v.Elem(), d+1) - if val != nil { - arg.Values = append(arg.Values, val) - } - } - - return arg -} - -func extractEndpoint(method reflect.Method) *registry.Endpoint { - if method.PkgPath != "" { - return nil - } - - var rspType, reqType reflect.Type - var stream bool - mt := method.Type - - switch mt.NumIn() { - case 3: - reqType = mt.In(1) - rspType = mt.In(2) - case 4: - reqType = mt.In(2) - rspType = mt.In(3) - default: - return nil - } - - // are we dealing with a stream? - switch rspType.Kind() { - case reflect.Func, reflect.Interface: - stream = true - } - - request := extractValue(reqType, 0) - response := extractValue(rspType, 0) - - return ®istry.Endpoint{ - Name: method.Name, - Request: request, - Response: response, - Metadata: map[string]string{ - "stream": fmt.Sprintf("%v", stream), - }, - } -} - -func extractSubValue(typ reflect.Type) *registry.Value { - var reqType reflect.Type - switch typ.NumIn() { - case 1: - reqType = typ.In(0) - case 2: - reqType = typ.In(1) - case 3: - reqType = typ.In(2) - default: - return nil - } - return extractValue(reqType, 0) -} diff --git a/go.mod b/go.mod index 42fc74d..aa0a142 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,6 @@ module github.com/unistack-org/micro-server-tcp go 1.13 require ( - github.com/unistack-org/micro-broker-memory v0.0.0-20200905101815-4594aa2807e9 - github.com/unistack-org/micro-registry-memory v0.0.0-20200905064113-34be66be749f - github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94 + github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47 golang.org/x/net v0.0.0-20200904194848-62affa334b73 ) diff --git a/go.sum b/go.sum index fbab51e..aa09c2c 100644 --- a/go.sum +++ b/go.sum @@ -279,26 +279,18 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY= github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= -github.com/unistack-org/micro-broker-memory v0.0.0-20200905101815-4594aa2807e9 h1:SHiNeIiNjs7BOjmS4tpb3amZKldUTigFiYtWzWOvHCU= -github.com/unistack-org/micro-broker-memory v0.0.0-20200905101815-4594aa2807e9/go.mod h1:j19KFne7Y8JvNXHwhjgjVJWAZDoqbnArZTMDEjyVWu0= +github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 h1:5b1yuSllbsMm/9fUIlIXSr8DbsKT/sAKSCgOx6+SAfI= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE= -github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b h1:v5Ak+Sr780jZclFDnx82g5biF0N5HRVKphEpJhbnVUs= -github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34 h1:VHc98t4SoiCF/jbkFu2e/j+IyJ/+MFQ1T+INNL7LubU= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34/go.mod h1:fT1gYn+TtfVZZ5tNx56bZIncJjmlji66g7GKdWua5hE= github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc h1:hHAU3rgeiA0LaudfNdMLf9/jkOBeFxvJdnwXevviZF8= github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.mod h1:il8nz4ZEcX3Usyfrtwy+YtQcb7xSUSFJdSe8PBJ9gOA= -github.com/unistack-org/micro-registry-memory v0.0.0-20200905064113-34be66be749f h1:aJwqchNCtEYpwCi/Y5MDhQuc1d8RQ5CX7ZgdmFhYeIY= -github.com/unistack-org/micro-registry-memory v0.0.0-20200905064113-34be66be749f/go.mod h1:eqAeQoWZMLVScoXEurBfkI5I96Gl1MCN2A/cX1JqaOY= github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200904234316-e7d418183b62/go.mod h1:mB0h+i3Sa4jD8G2dv97cAAdyh01hVQWKw4xSdmTpyOo= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101738-21d5ca1cddc1 h1:Ki+acK9YEn6OuqOQkZZPrGEslmeMvS9kPSObU01yn2E= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922101738-21d5ca1cddc1/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94 h1:1I3f/gjKIw/L61VsZesdOhzWzbvga0U867MOckvzVB0= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47 h1:3d/HgT7Iq/UIw5OGyzfUeZPJwydhBohh9shyGJH14EA= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/subscriber.go b/subscriber.go index d96ad1e..5321552 100644 --- a/subscriber.go +++ b/subscriber.go @@ -83,7 +83,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio endpoints = append(endpoints, ®istry.Endpoint{ Name: "Func", - Request: extractSubValue(typ), + Request: registry.ExtractSubValue(typ), Metadata: map[string]string{ "topic": topic, "subscriber": "true", @@ -111,7 +111,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio endpoints = append(endpoints, ®istry.Endpoint{ Name: name + "." + method.Name, - Request: extractSubValue(method.Type), + Request: registry.ExtractSubValue(method.Type), Metadata: map[string]string{ "topic": topic, "subscriber": "true", diff --git a/tcp.go b/tcp.go index 893fa4d..711bbf8 100644 --- a/tcp.go +++ b/tcp.go @@ -16,6 +16,7 @@ import ( "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/server" + regutil "github.com/unistack-org/micro/v3/util/registry" "golang.org/x/net/netutil" ) @@ -138,7 +139,12 @@ func (h *tcpServer) Register() error { eps := h.hd.Endpoints() h.Unlock() - service := serviceDef(opts) + service, err := regutil.NewService(h) + if err != nil { + return err + } + service.Metadata["protocol"] = "tcp" + service.Metadata["transport"] = "tcp" service.Endpoints = eps h.Lock() @@ -204,7 +210,10 @@ func (h *tcpServer) Deregister() error { logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id) - service := serviceDef(opts) + service, err := regutil.NewService(h) + if err != nil { + return err + } if err := opts.Registry.Deregister(service); err != nil { return err } diff --git a/tcp_test.go b/tcp_test.go deleted file mode 100644 index 6e55b6d..0000000 --- a/tcp_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package tcp_test - -import ( - "fmt" - "io" - "net" - "testing" - "time" - - bmemory "github.com/unistack-org/micro-broker-memory" - rmemory "github.com/unistack-org/micro-registry-memory" - tcp "github.com/unistack-org/micro-server-tcp" - "github.com/unistack-org/micro/v3/broker" - "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/server" -) - -type testHandler struct { - done chan struct{} -} - -func TestTCPServer(t *testing.T) { - reg := rmemory.NewRegistry() - brk := bmemory.NewBroker(broker.Registry(reg)) - // create server - srv := tcp.NewServer(server.Registry(reg), server.Broker(brk), server.Address("127.0.0.1:65000")) - - // create handler - h := &testHandler{done: make(chan struct{})} - - // register handler - if err := srv.Handle(srv.NewHandler(h)); err != nil { - t.Fatal(err) - } - - // start server - if err := srv.Start(); err != nil { - t.Fatal(err) - } - - // lookup server - service, err := reg.GetService(server.DefaultName) - if err != nil { - t.Fatal(err) - } - - if len(service) != 1 { - t.Fatalf("Expected 1 service got %d: %+v", len(service), service) - } - - if len(service[0].Nodes) != 1 { - t.Fatalf("Expected 1 node got %d: %+v", len(service[0].Nodes), service[0].Nodes) - } - - go func() { - <-h.done - // stop server - if err := srv.Stop(); err != nil { - t.Fatal(err) - } - }() - - c, err := net.DialTimeout("tcp", srv.Options().Address, 5*time.Second) - if err != nil { - t.Fatal(err) - } - defer c.Close() - - if _, err = c.Write([]byte("test")); err != nil { - t.Fatal(err) - } -} - -func (h *testHandler) Serve(c net.Conn) { - var n int - var err error - - defer c.Close() - - buf := make([]byte, 1024*8) // 8k buffer - - for { - n, err = c.Read(buf) - if err != nil && err == io.EOF { - return - } else if err != nil { - logger.Fatal(err) - } - fmt.Printf("%s", buf[:n]) - close(h.done) - } -}