diff --git a/client/client.go b/client/client.go index 32cb9a6b..4cfe0bc8 100644 --- a/client/client.go +++ b/client/client.go @@ -15,7 +15,7 @@ type Client interface { Init(...Option) error Options() Options NewMessage(topic string, msg interface{}, opts ...MessageOption) Message - NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request + NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error @@ -38,8 +38,8 @@ type Message interface { type Request interface { // The service to call Service() string - // The method to call - Method() string + // The endpoint to call + Endpoint() string // The content type ContentType() string // The unencoded request body @@ -125,8 +125,8 @@ func NewClient(opt ...Option) Client { // Creates a new request using the default client. Content Type will // be set to the default within options and use the appropriate codec -func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return DefaultClient.NewRequest(service, method, request, reqOpts...) +func NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request { + return DefaultClient.NewRequest(service, endpoint, request, reqOpts...) } // Creates a streaming connection with a service and returns responses on the diff --git a/client/mock/mock.go b/client/mock/mock.go index db267530..e9022c71 100644 --- a/client/mock/mock.go +++ b/client/mock/mock.go @@ -16,7 +16,7 @@ var ( ) type MockResponse struct { - Method string + Endpoint string Response interface{} Error error } @@ -54,8 +54,8 @@ func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.Me return m.Client.NewMessage(topic, msg, opts...) } -func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { - return m.Client.NewRequest(service, method, req, reqOpts...) +func (m *MockClient) NewRequest(service, endpoint string, req interface{}, reqOpts ...client.RequestOption) client.Request { + return m.Client.NewRequest(service, endpoint, req, reqOpts...) } func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { @@ -68,7 +68,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface } for _, r := range response { - if r.Method != req.Method() { + if r.Endpoint != req.Endpoint() { continue } @@ -91,7 +91,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface return nil } - return fmt.Errorf("rpc: can't find service %s", req.Method()) + return fmt.Errorf("rpc: can't find service %s", req.Endpoint()) } func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { diff --git a/client/mock/mock_test.go b/client/mock/mock_test.go index db9c1289..5dc5de1b 100644 --- a/client/mock/mock_test.go +++ b/client/mock/mock_test.go @@ -13,17 +13,17 @@ func TestClient(t *testing.T) { } response := []MockResponse{ - {Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}}, - {Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}}, - {Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")}, - {Method: "Foo.Func", Response: func() string { return "string" }}, - {Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }}, + {Endpoint: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}}, + {Endpoint: "Foo.Struct", Response: &TestResponse{Param: "aparam"}}, + {Endpoint: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")}, + {Endpoint: "Foo.Func", Response: func() string { return "string" }}, + {Endpoint: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }}, } c := NewClient(Response("go.mock", response)) for _, r := range response { - req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"}) + req := c.NewRequest("go.mock", r.Endpoint, map[string]interface{}{"foo": "bar"}) var rsp interface{} err := c.Call(context.TODO(), req, &rsp) diff --git a/client/rpc_client.go b/client/rpc_client.go index b85e76b7..34fd8671 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -458,7 +458,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt } b := &buffer{bytes.NewBuffer(nil)} if err := cf(b).Write(&codec.Message{ - Type: codec.Publication, + Target: msg.Topic(), + Type: codec.Publication, Header: map[string]string{ "X-Micro-Id": id, "X-Micro-Topic": msg.Topic(), diff --git a/client/rpc_client_test.go b/client/rpc_client_test.go index 6bafbdf5..72140711 100644 --- a/client/rpc_client_test.go +++ b/client/rpc_client_test.go @@ -14,7 +14,7 @@ import ( func TestCallAddress(t *testing.T) { var called bool service := "test.service" - method := "Test.Method" + endpoint := "Test.Endpoint" address := "10.1.10.1:8080" wrap := func(cf CallFunc) CallFunc { @@ -25,8 +25,8 @@ func TestCallAddress(t *testing.T) { return fmt.Errorf("expected service: %s got %s", service, req.Service()) } - if req.Method() != method { - return fmt.Errorf("expected service: %s got %s", method, req.Method()) + if req.Endpoint() != endpoint { + return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } if addr != address { @@ -45,7 +45,7 @@ func TestCallAddress(t *testing.T) { ) c.Options().Selector.Init(selector.Registry(r)) - req := c.NewRequest(service, method, nil) + req := c.NewRequest(service, endpoint, nil) // test calling remote address if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil { @@ -60,7 +60,7 @@ func TestCallAddress(t *testing.T) { func TestCallRetry(t *testing.T) { service := "test.service" - method := "Test.Method" + endpoint := "Test.Endpoint" address := "10.1.10.1:8080" var called int @@ -84,7 +84,7 @@ func TestCallRetry(t *testing.T) { ) c.Options().Selector.Init(selector.Registry(r)) - req := c.NewRequest(service, method, nil) + req := c.NewRequest(service, endpoint, nil) // test calling remote address if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil { @@ -101,7 +101,7 @@ func TestCallWrapper(t *testing.T) { var called bool id := "test.1" service := "test.service" - method := "Test.Method" + endpoint := "Test.Endpoint" host := "10.1.10.1" port := 8080 address := "10.1.10.1:8080" @@ -114,8 +114,8 @@ func TestCallWrapper(t *testing.T) { return fmt.Errorf("expected service: %s got %s", service, req.Service()) } - if req.Method() != method { - return fmt.Errorf("expected service: %s got %s", method, req.Method()) + if req.Endpoint() != endpoint { + return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) } if addr != address { @@ -146,7 +146,7 @@ func TestCallWrapper(t *testing.T) { }, }) - req := c.NewRequest(service, method, nil) + req := c.NewRequest(service, endpoint, nil) if err := c.Call(context.Background(), req, nil); err != nil { t.Fatal("call wrapper error", err) } diff --git a/client/rpc_codec.go b/client/rpc_codec.go index fe694380..aaa01569 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -102,14 +102,14 @@ func (c *rpcCodec) Write(wm *codec.Message, body interface{}) error { c.buf.wbuf.Reset() m := &codec.Message{ - Id: wm.Id, - Target: wm.Target, - Method: wm.Method, - Type: codec.Request, + Id: wm.Id, + Target: wm.Target, + Endpoint: wm.Endpoint, + Type: codec.Request, Header: map[string]string{ - "X-Micro-Id": wm.Id, - "X-Micro-Service": wm.Target, - "X-Micro-Method": wm.Method, + "X-Micro-Id": wm.Id, + "X-Micro-Service": wm.Target, + "X-Micro-Endpoint": wm.Endpoint, }, } @@ -150,7 +150,7 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { // read header err := c.codec.ReadHeader(&me, r) - wm.Method = me.Method + wm.Endpoint = me.Endpoint wm.Id = me.Id wm.Error = me.Error @@ -160,8 +160,8 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { } // check method in header - if len(me.Method) == 0 { - wm.Method = me.Header["X-Micro-Method"] + if len(me.Endpoint) == 0 { + wm.Endpoint = me.Header["X-Micro-Endpoint"] } if len(me.Id) == 0 { diff --git a/client/rpc_request.go b/client/rpc_request.go index b7f9695e..746b4be5 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -6,14 +6,14 @@ import ( type rpcRequest struct { service string - method string + endpoint string contentType string codec codec.Codec body interface{} opts RequestOptions } -func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { +func newRequest(service, endpoint string, request interface{}, contentType string, reqOpts ...RequestOption) Request { var opts RequestOptions for _, o := range reqOpts { @@ -27,7 +27,7 @@ func newRequest(service, method string, request interface{}, contentType string, return &rpcRequest{ service: service, - method: method, + endpoint: endpoint, body: request, contentType: contentType, opts: opts, @@ -42,8 +42,8 @@ func (r *rpcRequest) Service() string { return r.service } -func (r *rpcRequest) Method() string { - return r.method +func (r *rpcRequest) Endpoint() string { + return r.endpoint } func (r *rpcRequest) Body() interface{} { diff --git a/client/rpc_request_test.go b/client/rpc_request_test.go index bca3dd2f..e3516794 100644 --- a/client/rpc_request_test.go +++ b/client/rpc_request_test.go @@ -5,19 +5,19 @@ import ( ) func TestRequestOptions(t *testing.T) { - r := newRequest("service", "method", nil, "application/json") + r := newRequest("service", "endpoint", nil, "application/json") if r.Service() != "service" { t.Fatalf("expected 'service' got %s", r.Service()) } - if r.Method() != "method" { - t.Fatalf("expected 'method' got %s", r.Method()) + if r.Endpoint() != "endpoint" { + t.Fatalf("expected 'endpoint' got %s", r.Endpoint()) } if r.ContentType() != "application/json" { - t.Fatalf("expected 'method' got %s", r.ContentType()) + t.Fatalf("expected 'endpoint' got %s", r.ContentType()) } - r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf")) + r2 := newRequest("service", "endpoint", nil, "application/json", WithContentType("application/protobuf")) if r2.ContentType() != "application/protobuf" { - t.Fatalf("expected 'method' got %s", r2.ContentType()) + t.Fatalf("expected 'endpoint' got %s", r2.ContentType()) } } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 838172d4..627d2099 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -46,10 +46,10 @@ func (r *rpcStream) Send(msg interface{}) error { } req := codec.Message{ - Id: r.id, - Target: r.request.Service(), - Method: r.request.Method(), - Type: codec.Request, + Id: r.id, + Target: r.request.Service(), + Endpoint: r.request.Endpoint(), + Type: codec.Request, } if err := r.codec.Write(&req, msg); err != nil { diff --git a/codec/codec.go b/codec/codec.go index f4d4f295..868d2dd1 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -50,11 +50,11 @@ type Marshaler interface { // the communication, likely followed by the body. // In the case of an error, body may be nil. type Message struct { - Id string - Type MessageType - Target string - Method string - Error string + Id string + Type MessageType + Target string + Endpoint string + Error string // The values read from the socket Header map[string]string diff --git a/codec/grpc/grpc.go b/codec/grpc/grpc.go index f3703840..79c30f4b 100644 --- a/codec/grpc/grpc.go +++ b/codec/grpc/grpc.go @@ -29,7 +29,7 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { path := m.Header[":path"] if len(path) == 0 || path[0] != '/' { m.Target = m.Header["X-Micro-Service"] - m.Method = m.Header["X-Micro-Method"] + m.Endpoint = m.Header["X-Micro-Endpoint"] } else { // [ , a.package.Foo, Bar] parts := strings.Split(path, "/") @@ -37,7 +37,7 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { return errors.New("Unknown request path") } service := strings.Split(parts[1], ".") - m.Method = strings.Join([]string{service[len(service)-1], parts[2]}, ".") + m.Endpoint = strings.Join([]string{service[len(service)-1], parts[2]}, ".") m.Target = strings.Join(service[:len(service)-1], ".") } diff --git a/codec/jsonrpc/client.go b/codec/jsonrpc/client.go index f5ec5636..768e297d 100644 --- a/codec/jsonrpc/client.go +++ b/codec/jsonrpc/client.go @@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec { func (c *clientCodec) Write(m *codec.Message, b interface{}) error { c.Lock() - c.pending[m.Id] = m.Method + c.pending[m.Id] = m.Endpoint c.Unlock() - c.req.Method = m.Method + c.req.Method = m.Endpoint c.req.Params[0] = b c.req.ID = m.Id return c.enc.Encode(&c.req) @@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error { } c.Lock() - m.Method = c.pending[c.resp.ID] + m.Endpoint = c.pending[c.resp.ID] delete(c.pending, c.resp.ID) c.Unlock() diff --git a/codec/jsonrpc/server.go b/codec/jsonrpc/server.go index 53f681ef..a56b3468 100644 --- a/codec/jsonrpc/server.go +++ b/codec/jsonrpc/server.go @@ -53,7 +53,7 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error { if err := c.dec.Decode(&c.req); err != nil { return err } - m.Method = c.req.Method + m.Endpoint = c.req.Method m.Id = fmt.Sprintf("%v", c.req.ID) c.req.ID = nil return nil diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go index de05552f..a7d8ba79 100644 --- a/codec/protorpc/protorpc.go +++ b/codec/protorpc/protorpc.go @@ -47,7 +47,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { c.Lock() defer c.Unlock() // This is protobuf, of course we copy it. - pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)} + pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)} data, err := proto.Marshal(pbr) if err != nil { return err @@ -73,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { case codec.Response: c.Lock() defer c.Unlock() - rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error} + rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error} data, err := proto.Marshal(rtmp) if err != nil { return err @@ -126,7 +126,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - m.Method = rtmp.GetServiceMethod() + m.Endpoint = rtmp.GetServiceMethod() m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) case codec.Response: data, err := ReadNetString(c.rwc) @@ -138,7 +138,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - m.Method = rtmp.GetServiceMethod() + m.Endpoint = rtmp.GetServiceMethod() m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) m.Error = rtmp.GetError() case codec.Publication: diff --git a/server/rpc_codec.go b/server/rpc_codec.go index abceda03..b229ba11 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -106,14 +106,14 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { // set some internal things m.Target = m.Header["X-Micro-Service"] - m.Method = m.Header["X-Micro-Method"] + m.Endpoint = m.Header["X-Micro-Endpoint"] m.Id = m.Header["X-Micro-Id"] // read header via codec err := c.codec.ReadHeader(&m, codec.Request) // set the method/id - r.Method = m.Method + r.Endpoint = m.Endpoint r.Id = m.Id return err @@ -128,15 +128,15 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { // create a new message m := &codec.Message{ - Method: r.Method, - Id: r.Id, - Error: r.Error, - Type: r.Type, + Endpoint: r.Endpoint, + Id: r.Id, + Error: r.Error, + Type: r.Type, Header: map[string]string{ - "X-Micro-Id": r.Id, - "X-Micro-Method": r.Method, - "X-Micro-Error": r.Error, - "Content-Type": c.req.Header["Content-Type"], + "X-Micro-Id": r.Id, + "X-Micro-Endpoint": r.Endpoint, + "X-Micro-Error": r.Error, + "Content-Type": c.req.Header["Content-Type"], }, } diff --git a/server/rpc_codec_test.go b/server/rpc_codec_test.go index 59035e3e..1088af01 100644 --- a/server/rpc_codec_test.go +++ b/server/rpc_codec_test.go @@ -48,9 +48,9 @@ func TestCodecWriteError(t *testing.T) { } err := c.Write(&codec.Message{ - Method: "Service.Method", - Id: "0", - Error: "", + Endpoint: "Service.Endpoint", + Id: "0", + Error: "", }, "body") if err != nil { diff --git a/server/rpc_request.go b/server/rpc_request.go index cbc4179d..17ede91b 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -7,7 +7,7 @@ import ( type rpcRequest struct { service string - method string + endpoint string contentType string socket transport.Socket codec codec.Codec @@ -34,8 +34,8 @@ func (r *rpcRequest) Service() string { return r.service } -func (r *rpcRequest) Method() string { - return r.method +func (r *rpcRequest) Endpoint() string { + return r.endpoint } func (r *rpcRequest) Header() map[string]string { diff --git a/server/rpc_router.go b/server/rpc_router.go index 1043036b..782fb55b 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -171,7 +171,7 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte resp.msg = msg // Encode the response header - resp.msg.Method = req.msg.Method + resp.msg.Endpoint = req.msg.Endpoint if errmsg != "" { resp.msg.Error = errmsg reply = invalidRequest @@ -191,7 +191,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, r := &rpcRequest{ service: req.msg.Target, contentType: req.msg.Header["Content-Type"], - method: req.msg.Method, + endpoint: req.msg.Endpoint, body: req.msg.Body, } @@ -379,9 +379,9 @@ func (router *router) readHeader(cc codec.Reader) (service *service, mtype *meth // we can still recover and move on to the next request. keepReading = true - serviceMethod := strings.Split(req.msg.Method, ".") + serviceMethod := strings.Split(req.msg.Endpoint, ".") if len(serviceMethod) != 2 { - err = errors.New("rpc: service/method request ill-formed: " + req.msg.Method) + err = errors.New("rpc: service/method request ill-formed: " + req.msg.Endpoint) return } // Look up the request. @@ -389,12 +389,12 @@ func (router *router) readHeader(cc codec.Reader) (service *service, mtype *meth service = router.serviceMap[serviceMethod[0]] router.mu.Unlock() if service == nil { - err = errors.New("rpc: can't find service " + req.msg.Method) + err = errors.New("rpc: can't find service " + req.msg.Endpoint) return } mtype = service.method[serviceMethod[1]] if mtype == nil { - err = errors.New("rpc: can't find method " + req.msg.Method) + err = errors.New("rpc: can't find method " + req.msg.Endpoint) } return } diff --git a/server/rpc_server.go b/server/rpc_server.go index 4481d9bb..c1510905 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -115,7 +115,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // internal request request := &rpcRequest{ service: msg.Header["X-Micro-Service"], - method: msg.Header["X-Micro-Method"], + endpoint: msg.Header["X-Micro-Endpoint"], contentType: ct, codec: codec, header: msg.Header, diff --git a/server/rpc_stream.go b/server/rpc_stream.go index efbf0d55..2f74049a 100644 --- a/server/rpc_stream.go +++ b/server/rpc_stream.go @@ -31,9 +31,9 @@ func (r *rpcStream) Send(msg interface{}) error { defer r.Unlock() resp := codec.Message{ - Method: r.request.Method(), - Id: r.id, - Type: codec.Response, + Endpoint: r.request.Endpoint(), + Id: r.id, + Type: codec.Response, } return r.codec.Write(&resp, msg) diff --git a/server/server.go b/server/server.go index fbfacd0d..b4ddcaa9 100644 --- a/server/server.go +++ b/server/server.go @@ -45,8 +45,8 @@ type Message interface { type Request interface { // Service name requested Service() string - // Method name requested - Method() string + // Endpoint name requested + Endpoint() string // Content type provided ContentType() string // Header of the request @@ -83,7 +83,7 @@ type Stream interface { } // Handler interface represents a request handler. It's generated -// by passing any type of public concrete object with methods into server.NewHandler. +// by passing any type of public concrete object with endpoints into server.NewHandler. // Most will pass in a struct. // // Example: @@ -102,7 +102,7 @@ type Handler interface { } // Subscriber interface represents a subscription to a given topic using -// a specific subscriber function or object with methods. +// a specific subscriber function or object with endpoints. type Subscriber interface { Topic() string Subscriber() interface{} @@ -151,7 +151,7 @@ func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscr // NewHandler creates a new handler interface using the default server // Handlers are required to be a public object with public -// methods. Call to a service method such as Foo.Bar expects +// endpoints. Call to a service endpoint such as Foo.Bar expects // the type: // // type Foo struct {}