From 07068379c6158c34585cbeb7824fa9405901f80c Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 16:16:58 +0100 Subject: [PATCH 1/8] remove remote func methods --- client/client.go | 12 --------- client/options.go | 9 +++++++ client/rpc_client.go | 52 +++++++++++++++++++-------------------- client/rpc_client_test.go | 46 ++++++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 39 deletions(-) diff --git a/client/client.go b/client/client.go index 8975694e..4022d249 100644 --- a/client/client.go +++ b/client/client.go @@ -17,9 +17,7 @@ type Client interface { NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error - CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) - StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error) Publish(ctx context.Context, p Publication, opts ...PublishOption) error String() string } @@ -85,22 +83,12 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca return DefaultClient.Call(ctx, request, response, opts...) } -// Makes a synchronous call to the specified address using the default client -func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { - return DefaultClient.CallRemote(ctx, address, request, response, opts...) -} - // Creates a streaming connection with a service and returns responses on the // channel passed in. It's up to the user to close the streamer. func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { return DefaultClient.Stream(ctx, request, opts...) } -// Creates a streaming connection to the address specified. -func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { - return DefaultClient.StreamRemote(ctx, address, request, opts...) -} - // Publishes a publication using the default client. Using the underlying broker // set within the options. func Publish(ctx context.Context, p Publication) error { diff --git a/client/options.go b/client/options.go index f806500b..5d7a0965 100644 --- a/client/options.go +++ b/client/options.go @@ -40,6 +40,8 @@ type Options struct { type CallOptions struct { SelectOptions []selector.SelectOption + // Address of remote host + Address string // Backoff func Backoff BackoffFunc // Check if retriable func @@ -226,6 +228,13 @@ func DialTimeout(d time.Duration) Option { // Call Options +// WithAddress sets the remote address to use rather than using service discovery +func WithAddress(a string) CallOption { + return func(o *CallOptions) { + o.Address = a + } +} + func WithSelectOption(so ...selector.SelectOption) CallOption { return func(o *CallOptions) { o.SelectOptions = append(o.SelectOptions, so...) diff --git a/client/rpc_client.go b/client/rpc_client.go index e251b1a4..ad244731 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -11,6 +11,7 @@ import ( "github.com/micro/go-micro/codec" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" + "github.com/micro/go-micro/registry" "github.com/micro/go-micro/selector" "github.com/micro/go-micro/transport" "sync/atomic" @@ -213,13 +214,25 @@ func (r *rpcClient) Options() Options { return r.opts } -func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { - // make a copy of call opts - callOpts := r.opts.CallOptions - for _, opt := range opts { - opt(&callOpts) +func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { + // return remote address + if len(opts.Address) > 0 { + return func() (*registry.Node, error) { + return ®istry.Node{ + Address: opts.Address, + }, nil + }, nil } - return r.call(ctx, address, request, response, callOpts) + + // get next nodes from the selector + next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...) + if err != nil && err == selector.ErrNotFound { + return nil, errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + return next, nil } func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { @@ -229,12 +242,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac opt(&callOpts) } - // get next nodes from the selector - next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + next, err := r.next(request, callOpts) + if err != nil { + return err } // check if we already have a deadline @@ -330,15 +340,6 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return gerr } -func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { - // make a copy of call opts - callOpts := r.opts.CallOptions - for _, opt := range opts { - opt(&callOpts) - } - return r.stream(ctx, address, request, callOpts) -} - func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { // make a copy of call opts callOpts := r.opts.CallOptions @@ -346,12 +347,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt opt(&callOpts) } - // get next nodes from the selector - next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...) - if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", err.Error()) - } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", err.Error()) + next, err := r.next(request, callOpts) + if err != nil { + return nil, err } // check if we already have a deadline diff --git a/client/rpc_client_test.go b/client/rpc_client_test.go index 7c3b9a98..9e5c87b4 100644 --- a/client/rpc_client_test.go +++ b/client/rpc_client_test.go @@ -10,6 +10,52 @@ import ( "github.com/micro/go-micro/selector" ) +func TestCallAddress(t *testing.T) { + var called bool + service := "test.service" + method := "Test.Method" + address := "10.1.10.1:8080" + + wrap := func(cf CallFunc) CallFunc { + return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { + called = true + + if req.Service() != service { + return fmt.Errorf("expected service: %s got %s", service, req.Service()) + } + + if req.Method() != method { + return fmt.Errorf("expected service: %s got %s", method, req.Method()) + } + + if addr != address { + return fmt.Errorf("expected address: %s got %s", address, addr) + } + + // don't do the call + return nil + } + } + + r := mock.NewRegistry() + c := NewClient( + Registry(r), + WrapCall(wrap), + ) + c.Options().Selector.Init(selector.Registry(r)) + + req := c.NewRequest(service, method, nil) + + // test calling remote address + if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil { + t.Fatal("call with address error", err) + } + + if !called { + t.Fatal("wrapper not called") + } + +} func TestCallWrapper(t *testing.T) { var called bool id := "test.1" From c2cfe5310c9bf6f8b36e0734f15538ddba846809 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 18:06:52 +0100 Subject: [PATCH 2/8] Rework client interface --- client/backoff_test.go | 2 +- client/client.go | 36 +++++++------------------ client/mock/mock.go | 54 +++----------------------------------- client/mock/mock_test.go | 2 +- client/options.go | 9 ++++++- client/rpc_client.go | 27 ++++++------------- client/rpc_message.go | 27 +++++++++++++++++++ client/rpc_publication.go | 27 ------------------- client/rpc_request.go | 7 ++++- client/rpc_request_test.go | 23 ++++++++++++++++ 10 files changed, 87 insertions(+), 127 deletions(-) create mode 100644 client/rpc_message.go delete mode 100644 client/rpc_publication.go create mode 100644 client/rpc_request_test.go diff --git a/client/backoff_test.go b/client/backoff_test.go index 9b781491..11ba3980 100644 --- a/client/backoff_test.go +++ b/client/backoff_test.go @@ -11,7 +11,7 @@ func TestBackoff(t *testing.T) { delta := time.Duration(0) for i := 0; i < 5; i++ { - d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i) + d, err := exponentialBackoff(context.TODO(), NewRequest("test", "test", nil), i) if err != nil { t.Fatal(err) } diff --git a/client/client.go b/client/client.go index 4022d249..4d379ede 100644 --- a/client/client.go +++ b/client/client.go @@ -12,20 +12,18 @@ import ( type Client interface { Init(...Option) error Options() Options - NewPublication(topic string, msg interface{}) Publication + NewMessage(topic string, msg interface{}) Message NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request - NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request - NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) - Publish(ctx context.Context, p Publication, opts ...PublishOption) error + Publish(ctx context.Context, msg Message, opts ...PublishOption) error String() string } -// Publication is the interface for a message published asynchronously -type Publication interface { +// Message is the interface for publishing asynchronously +type Message interface { Topic() string - Message() interface{} + Payload() interface{} ContentType() string } @@ -91,8 +89,8 @@ func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, // Publishes a publication using the default client. Using the underlying broker // set within the options. -func Publish(ctx context.Context, p Publication) error { - return DefaultClient.Publish(ctx, p) +func Publish(ctx context.Context, msg Message) error { + return DefaultClient.Publish(ctx, msg) } // Creates a new client with the options passed in @@ -100,9 +98,9 @@ func NewClient(opt ...Option) Client { return newRpcClient(opt...) } -// Creates a new publication using the default client -func NewPublication(topic string, message interface{}) Publication { - return DefaultClient.NewPublication(topic, message) +// Creates a new message using the default client +func NewMessage(topic string, message interface{}) Message { + return DefaultClient.NewMessage(topic, message) } // Creates a new request using the default client. Content Type will @@ -110,17 +108,3 @@ func NewPublication(topic string, message interface{}) Publication { func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { return DefaultClient.NewRequest(service, method, request, reqOpts...) } - -// Creates a new protobuf request using the default client -func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return DefaultClient.NewProtoRequest(service, method, request, reqOpts...) -} - -// Creates a new json request using the default client -func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return DefaultClient.NewJsonRequest(service, method, request, reqOpts...) -} - -func String() string { - return DefaultClient.String() -} diff --git a/client/mock/mock.go b/client/mock/mock.go index c22dfad8..9182f3ba 100644 --- a/client/mock/mock.go +++ b/client/mock/mock.go @@ -49,22 +49,14 @@ func (m *MockClient) Options() client.Options { return m.Opts } -func (m *MockClient) NewPublication(topic string, msg interface{}) client.Publication { - return m.Client.NewPublication(topic, msg) +func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message { + return m.Client.NewMessage(topic, msg) } func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { return m.Client.NewRequest(service, method, req, reqOpts...) } -func (m *MockClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { - return m.Client.NewProtoRequest(service, method, req, reqOpts...) -} - -func (m *MockClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { - return m.Client.NewJsonRequest(service, method, req, reqOpts...) -} - func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { m.Lock() defer m.Unlock() @@ -97,38 +89,6 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface return fmt.Errorf("rpc: can't find service %s", req.Method()) } -func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error { - m.Lock() - defer m.Unlock() - - response, ok := m.Response[req.Service()] - if !ok { - return errors.NotFound("go.micro.client.mock", "service not found") - } - - for _, r := range response { - if r.Method != req.Method() { - continue - } - - if r.Error != nil { - return r.Error - } - - v := reflect.ValueOf(rsp) - - if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr { - v = reflect.Indirect(v) - } - - v.Set(reflect.ValueOf(r.Response)) - - return nil - } - - return fmt.Errorf("rpc: can't find service %s", req.Method()) -} - func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { m.Lock() defer m.Unlock() @@ -137,15 +97,7 @@ func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...cli return nil, nil } -func (m *MockClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) { - m.Lock() - defer m.Unlock() - - // TODO: mock stream - return nil, nil -} - -func (m *MockClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error { +func (m *MockClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { return nil } diff --git a/client/mock/mock_test.go b/client/mock/mock_test.go index 204b0b96..a13cef13 100644 --- a/client/mock/mock_test.go +++ b/client/mock/mock_test.go @@ -21,7 +21,7 @@ func TestClient(t *testing.T) { c := NewClient(Response("go.mock", response)) for _, r := range response { - req := c.NewJsonRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"}) + req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"}) var rsp interface{} err := c.Call(context.TODO(), req, &rsp) diff --git a/client/options.go b/client/options.go index 5d7a0965..2d465850 100644 --- a/client/options.go +++ b/client/options.go @@ -68,7 +68,8 @@ type PublishOptions struct { } type RequestOptions struct { - Stream bool + ContentType string + Stream bool // Other options for implementations of the interface // can be stored in a context @@ -290,6 +291,12 @@ func WithDialTimeout(d time.Duration) CallOption { // Request Options +func WithContentType(ct string) RequestOption { + return func(o *RequestOptions) { + o.ContentType = ct + } +} + func StreamingRequest() RequestOption { return func(o *RequestOptions) { o.Stream = true diff --git a/client/rpc_client.go b/client/rpc_client.go index ad244731..82781d14 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -439,49 +439,38 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt return nil, grr } -func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error { +func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error { md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) } - md["Content-Type"] = p.ContentType() + md["Content-Type"] = msg.ContentType() // encode message body - cf, err := r.newCodec(p.ContentType()) + cf, err := r.newCodec(msg.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } b := &buffer{bytes.NewBuffer(nil)} - if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { + if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } r.once.Do(func() { r.opts.Broker.Connect() }) - return r.opts.Broker.Publish(p.Topic(), &broker.Message{ + return r.opts.Broker.Publish(msg.Topic(), &broker.Message{ Header: md, Body: b.Bytes(), }) } -func (r *rpcClient) NewPublication(topic string, message interface{}) Publication { - return newRpcPublication(topic, message, r.opts.ContentType) +func (r *rpcClient) NewMessage(topic string, message interface{}) Message { + return newMessage(topic, message, r.opts.ContentType) } -func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication { - return newRpcPublication(topic, message, "application/octet-stream") -} func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...) -} - -func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...) -} - -func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return newRpcRequest(service, method, request, "application/json", reqOpts...) + return newRequest(service, method, request, r.opts.ContentType, reqOpts...) } func (r *rpcClient) String() string { diff --git a/client/rpc_message.go b/client/rpc_message.go new file mode 100644 index 00000000..13bd15c7 --- /dev/null +++ b/client/rpc_message.go @@ -0,0 +1,27 @@ +package client + +type message struct { + topic string + contentType string + payload interface{} +} + +func newMessage(topic string, payload interface{}, contentType string) Message { + return &message{ + payload: payload, + topic: topic, + contentType: contentType, + } +} + +func (m *message) ContentType() string { + return m.contentType +} + +func (m *message) Topic() string { + return m.topic +} + +func (m *message) Payload() interface{} { + return m.payload +} diff --git a/client/rpc_publication.go b/client/rpc_publication.go deleted file mode 100644 index 01a0073e..00000000 --- a/client/rpc_publication.go +++ /dev/null @@ -1,27 +0,0 @@ -package client - -type rpcPublication struct { - topic string - contentType string - message interface{} -} - -func newRpcPublication(topic string, message interface{}, contentType string) Publication { - return &rpcPublication{ - message: message, - topic: topic, - contentType: contentType, - } -} - -func (r *rpcPublication) ContentType() string { - return r.contentType -} - -func (r *rpcPublication) Topic() string { - return r.topic -} - -func (r *rpcPublication) Message() interface{} { - return r.message -} diff --git a/client/rpc_request.go b/client/rpc_request.go index ec50a76a..04ce490a 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -8,13 +8,18 @@ type rpcRequest struct { opts RequestOptions } -func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { +func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { var opts RequestOptions for _, o := range reqOpts { o(&opts) } + // set the content-type specified + if len(opts.ContentType) > 0 { + contentType = opts.ContentType + } + return &rpcRequest{ service: service, method: method, diff --git a/client/rpc_request_test.go b/client/rpc_request_test.go new file mode 100644 index 00000000..bca3dd2f --- /dev/null +++ b/client/rpc_request_test.go @@ -0,0 +1,23 @@ +package client + +import ( + "testing" +) + +func TestRequestOptions(t *testing.T) { + r := newRequest("service", "method", nil, "application/json") + if r.Service() != "service" { + t.Fatalf("expected 'service' got %s", r.Service()) + } + if r.Method() != "method" { + t.Fatalf("expected 'method' got %s", r.Method()) + } + if r.ContentType() != "application/json" { + t.Fatalf("expected 'method' got %s", r.ContentType()) + } + + r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf")) + if r2.ContentType() != "application/protobuf" { + t.Fatalf("expected 'method' got %s", r2.ContentType()) + } +} From 65068e8b828f597cfbb565591ad96b99d0b1a7f1 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 18:15:09 +0100 Subject: [PATCH 3/8] rename Streamer to Stream --- client/backoff_test.go | 4 +- client/client.go | 34 ++----------- client/client_wrapper.go | 2 +- client/mock/mock.go | 2 +- client/rpc_client.go | 8 +-- publisher.go | 2 +- server/rpc_service.go | 4 +- server/server.go | 103 +-------------------------------------- server/server_wrapper.go | 4 +- wrapper.go | 4 +- 10 files changed, 21 insertions(+), 146 deletions(-) diff --git a/client/backoff_test.go b/client/backoff_test.go index 11ba3980..60ae19aa 100644 --- a/client/backoff_test.go +++ b/client/backoff_test.go @@ -10,8 +10,10 @@ import ( func TestBackoff(t *testing.T) { delta := time.Duration(0) + c := NewClient() + for i := 0; i < 5; i++ { - d, err := exponentialBackoff(context.TODO(), NewRequest("test", "test", nil), i) + d, err := exponentialBackoff(context.TODO(), c.NewRequest("test", "test", nil), i) if err != nil { t.Fatal(err) } diff --git a/client/client.go b/client/client.go index 4d379ede..f2297895 100644 --- a/client/client.go +++ b/client/client.go @@ -15,7 +15,7 @@ type Client interface { NewMessage(topic string, msg interface{}) Message NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error - Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) + Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Publish(ctx context.Context, msg Message, opts ...PublishOption) error String() string } @@ -37,8 +37,8 @@ type Request interface { Stream() bool } -// Streamer is the inteface for a bidirectional synchronous stream -type Streamer interface { +// Stream is the inteface for a bidirectional synchronous stream +type Stream interface { Context() context.Context Request() Request Send(interface{}) error @@ -76,35 +76,7 @@ var ( DefaultPoolTTL = time.Minute ) -// Makes a synchronous call to a service using the default client -func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { - return DefaultClient.Call(ctx, request, response, opts...) -} - -// Creates a streaming connection with a service and returns responses on the -// channel passed in. It's up to the user to close the streamer. -func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { - return DefaultClient.Stream(ctx, request, opts...) -} - -// Publishes a publication using the default client. Using the underlying broker -// set within the options. -func Publish(ctx context.Context, msg Message) error { - return DefaultClient.Publish(ctx, msg) -} - // Creates a new client with the options passed in func NewClient(opt ...Option) Client { return newRpcClient(opt...) } - -// Creates a new message using the default client -func NewMessage(topic string, message interface{}) Message { - return DefaultClient.NewMessage(topic, message) -} - -// Creates a new request using the default client. Content Type will -// be set to the default within options and use the appropriate codec -func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return DefaultClient.NewRequest(service, method, request, reqOpts...) -} diff --git a/client/client_wrapper.go b/client/client_wrapper.go index e8b87865..aacb5ae5 100644 --- a/client/client_wrapper.go +++ b/client/client_wrapper.go @@ -14,4 +14,4 @@ type CallWrapper func(CallFunc) CallFunc type Wrapper func(Client) Client // StreamWrapper wraps a Stream and returns the equivalent -type StreamWrapper func(Streamer) Streamer +type StreamWrapper func(Stream) Stream diff --git a/client/mock/mock.go b/client/mock/mock.go index 9182f3ba..c22d84d6 100644 --- a/client/mock/mock.go +++ b/client/mock/mock.go @@ -89,7 +89,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface return fmt.Errorf("rpc: can't find service %s", req.Method()) } -func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { +func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { m.Lock() defer m.Unlock() diff --git a/client/rpc_client.go b/client/rpc_client.go index 82781d14..b71b6c91 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -135,7 +135,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp } } -func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) { +func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) { msg := &transport.Message{ Header: make(map[string]string), } @@ -340,7 +340,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return gerr } -func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { +func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) { // make a copy of call opts callOpts := r.opts.CallOptions for _, opt := range opts { @@ -371,7 +371,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt default: } - call := func(i int) (Streamer, error) { + call := func(i int) (Stream, error) { // call backoff first. Someone may want an initial start delay t, err := callOpts.Backoff(ctx, request, i) if err != nil { @@ -401,7 +401,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt } type response struct { - stream Streamer + stream Stream err error } diff --git a/publisher.go b/publisher.go index 71290a0f..ca81469c 100644 --- a/publisher.go +++ b/publisher.go @@ -12,5 +12,5 @@ type publisher struct { } func (p *publisher) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error { - return p.c.Publish(ctx, p.c.NewPublication(p.topic, msg)) + return p.c.Publish(ctx, p.c.NewMessage(p.topic, msg)) } diff --git a/server/rpc_service.go b/server/rpc_service.go index f90cb18e..776f4441 100644 --- a/server/rpc_service.go +++ b/server/rpc_service.go @@ -119,9 +119,9 @@ func prepareMethod(method reflect.Method) *methodType { if stream { // check stream type - streamType := reflect.TypeOf((*Streamer)(nil)).Elem() + streamType := reflect.TypeOf((*Stream)(nil)).Elem() if !argType.Implements(streamType) { - log.Log(mname, "argument does not implement Streamer interface:", argType) + log.Log(mname, "argument does not implement Stream interface:", argType) return nil } } else { diff --git a/server/server.go b/server/server.go index 5311c238..f2f76f3c 100644 --- a/server/server.go +++ b/server/server.go @@ -3,11 +3,7 @@ package server import ( "context" - "os" - "os/signal" - "syscall" - "github.com/micro/go-log" "github.com/pborman/uuid" ) @@ -40,11 +36,11 @@ type Request interface { Stream() bool } -// Streamer represents a stream established with a client. +// Stream represents a stream established with a client. // A stream can be bidirectional which is indicated by the request. // The last error will be left in Error(). // EOF indicated end of the stream. -type Streamer interface { +type Stream interface { Context() context.Context Request() Request Send(interface{}) error @@ -67,102 +63,7 @@ var ( DefaultServer Server = newRpcServer() ) -// DefaultOptions returns config options for the default service -func DefaultOptions() Options { - return DefaultServer.Options() -} - -// Init initialises the default server with options passed in -func Init(opt ...Option) { - if DefaultServer == nil { - DefaultServer = newRpcServer(opt...) - } - DefaultServer.Init(opt...) -} - // NewServer returns a new server with options passed in func NewServer(opt ...Option) Server { return newRpcServer(opt...) } - -// NewSubscriber creates a new subscriber interface with the given topic -// and handler using the default server -func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber { - return DefaultServer.NewSubscriber(topic, h, opts...) -} - -// NewHandler creates a new handler interface using the default server -// Handlers are required to be a public object with public -// methods. Call to a service method such as Foo.Bar expects -// the type: -// -// type Foo struct {} -// func (f *Foo) Bar(ctx, req, rsp) error { -// return nil -// } -// -func NewHandler(h interface{}, opts ...HandlerOption) Handler { - return DefaultServer.NewHandler(h, opts...) -} - -// Handle registers a handler interface with the default server to -// handle inbound requests -func Handle(h Handler) error { - return DefaultServer.Handle(h) -} - -// Subscribe registers a subscriber interface with the default server -// which subscribes to specified topic with the broker -func Subscribe(s Subscriber) error { - return DefaultServer.Subscribe(s) -} - -// Register registers the default server with the discovery system -func Register() error { - return DefaultServer.Register() -} - -// Deregister deregisters the default server from the discovery system -func Deregister() error { - return DefaultServer.Deregister() -} - -// Run starts the default server and waits for a kill -// signal before exiting. Also registers/deregisters the server -func Run() error { - if err := Start(); err != nil { - return err - } - - if err := DefaultServer.Register(); err != nil { - return err - } - - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) - log.Logf("Received signal %s", <-ch) - - if err := DefaultServer.Deregister(); err != nil { - return err - } - - return Stop() -} - -// Start starts the default server -func Start() error { - config := DefaultServer.Options() - log.Logf("Starting server %s id %s", config.Name, config.Id) - return DefaultServer.Start() -} - -// Stop stops the default server -func Stop() error { - log.Logf("Stopping server") - return DefaultServer.Stop() -} - -// String returns name of Server implementation -func String() string { - return DefaultServer.String() -} diff --git a/server/server_wrapper.go b/server/server_wrapper.go index 2ff2737c..e90ae338 100644 --- a/server/server_wrapper.go +++ b/server/server_wrapper.go @@ -20,8 +20,8 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent type SubscriberWrapper func(SubscriberFunc) SubscriberFunc -// StreamerWrapper wraps a Streamer interface and returns the equivalent. +// StreamWrapper wraps a Stream interface and returns the equivalent. // Because streams exist for the lifetime of a method invocation this // is a convenient way to wrap a Stream as its in use for trace, monitoring, // metrics, etc. -type StreamerWrapper func(Streamer) Streamer +type StreamWrapper func(Stream) Stream diff --git a/wrapper.go b/wrapper.go index b0139efe..b7c59f04 100644 --- a/wrapper.go +++ b/wrapper.go @@ -36,12 +36,12 @@ func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interf return c.Client.Call(ctx, req, rsp, opts...) } -func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { +func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { ctx = c.setHeaders(ctx) return c.Client.Stream(ctx, req, opts...) } -func (c *clientWrapper) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error { +func (c *clientWrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { ctx = c.setHeaders(ctx) return c.Client.Publish(ctx, p, opts...) } From d00d76bf7cfb622830da458d1fd4fa85010aab75 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 18:21:02 +0100 Subject: [PATCH 4/8] Move publication to message --- function.go | 2 +- server/rpc_request.go | 12 ++++++------ server/server.go | 4 ++-- server/server_wrapper.go | 2 +- server/subscriber.go | 8 ++++---- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/function.go b/function.go index 97cf17aa..4379a4b3 100644 --- a/function.go +++ b/function.go @@ -23,7 +23,7 @@ func fnHandlerWrapper(f Function) server.HandlerWrapper { func fnSubWrapper(f Function) server.SubscriberWrapper { return func(s server.SubscriberFunc) server.SubscriberFunc { - return func(ctx context.Context, msg server.Publication) error { + return func(ctx context.Context, msg server.Message) error { defer f.Done() return s(ctx, msg) } diff --git a/server/rpc_request.go b/server/rpc_request.go index 5dcac361..b96286c2 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -8,10 +8,10 @@ type rpcRequest struct { stream bool } -type rpcPublication struct { +type rpcMessage struct { topic string contentType string - message interface{} + payload interface{} } func (r *rpcRequest) ContentType() string { @@ -34,14 +34,14 @@ func (r *rpcRequest) Stream() bool { return r.stream } -func (r *rpcPublication) ContentType() string { +func (r *rpcMessage) ContentType() string { return r.contentType } -func (r *rpcPublication) Topic() string { +func (r *rpcMessage) Topic() string { return r.topic } -func (r *rpcPublication) Message() interface{} { - return r.message +func (r *rpcMessage) Payload() interface{} { + return r.payload } diff --git a/server/server.go b/server/server.go index f2f76f3c..9b9f2ba4 100644 --- a/server/server.go +++ b/server/server.go @@ -21,9 +21,9 @@ type Server interface { String() string } -type Publication interface { +type Message interface { Topic() string - Message() interface{} + Payload() interface{} ContentType() string } diff --git a/server/server_wrapper.go b/server/server_wrapper.go index e90ae338..3e4d3ecd 100644 --- a/server/server_wrapper.go +++ b/server/server_wrapper.go @@ -12,7 +12,7 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error // SubscriberFunc represents a single method of a subscriber. It's used primarily // for the wrappers. What's handed to the actual method is the concrete // publication message. -type SubscriberFunc func(ctx context.Context, msg Publication) error +type SubscriberFunc func(ctx context.Context, msg Message) error // HandlerWrapper wraps the HandlerFunc and returns the equivalent type HandlerWrapper func(HandlerFunc) HandlerFunc diff --git a/server/subscriber.go b/server/subscriber.go index d69d3620..0af2457b 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -204,7 +204,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle return err } - fn := func(ctx context.Context, msg Publication) error { + fn := func(ctx context.Context, msg Message) error { var vals []reflect.Value if sb.typ.Kind() != reflect.Func { vals = append(vals, sb.rcvr) @@ -213,7 +213,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle vals = append(vals, reflect.ValueOf(ctx)) } - vals = append(vals, reflect.ValueOf(msg.Message())) + vals = append(vals, reflect.ValueOf(msg.Payload())) returnValues := handler.method.Call(vals) if err := returnValues[0].Interface(); err != nil { @@ -229,10 +229,10 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle s.wg.Add(1) go func() { defer s.wg.Done() - fn(ctx, &rpcPublication{ + fn(ctx, &rpcMessage{ topic: sb.topic, contentType: ct, - message: req.Interface(), + payload: req.Interface(), }) }() } From 173f7107e2212d1962ee29415f801ad19c22738a Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 18:26:54 +0100 Subject: [PATCH 5/8] remove broker default funcs --- broker/broker.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 39db9f17..d1dccaeb 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -44,27 +44,3 @@ var ( func NewBroker(opts ...Option) Broker { return newHttpBroker(opts...) } - -func Init(opts ...Option) error { - return DefaultBroker.Init(opts...) -} - -func Connect() error { - return DefaultBroker.Connect() -} - -func Disconnect() error { - return DefaultBroker.Disconnect() -} - -func Publish(topic string, msg *Message, opts ...PublishOption) error { - return DefaultBroker.Publish(topic, msg, opts...) -} - -func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { - return DefaultBroker.Subscribe(topic, handler, opts...) -} - -func String() string { - return DefaultBroker.String() -} From d00ac200dd236ae87e7d4323a2ce51c658decc90 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 18:43:54 +0100 Subject: [PATCH 6/8] remove registry and transport default funcs --- registry/registry.go | 29 ----------------------------- transport/transport.go | 12 ------------ 2 files changed, 41 deletions(-) diff --git a/registry/registry.go b/registry/registry.go index 489517f5..6225be38 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -33,32 +33,3 @@ var ( func NewRegistry(opts ...Option) Registry { return newConsulRegistry(opts...) } - -// Register a service node. Additionally supply options such as TTL. -func Register(s *Service, opts ...RegisterOption) error { - return DefaultRegistry.Register(s, opts...) -} - -// Deregister a service node -func Deregister(s *Service) error { - return DefaultRegistry.Deregister(s) -} - -// Retrieve a service. A slice is returned since we separate Name/Version. -func GetService(name string) ([]*Service, error) { - return DefaultRegistry.GetService(name) -} - -// List the services. Only returns service names -func ListServices() ([]*Service, error) { - return DefaultRegistry.ListServices() -} - -// Watch returns a watcher which allows you to track updates to the registry. -func Watch(opts ...WatchOption) (Watcher, error) { - return DefaultRegistry.Watch(opts...) -} - -func String() string { - return DefaultRegistry.String() -} diff --git a/transport/transport.go b/transport/transport.go index 8f3e0879..383d5b6f 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -50,15 +50,3 @@ var ( func NewTransport(opts ...Option) Transport { return newHTTPTransport(opts...) } - -func Dial(addr string, opts ...DialOption) (Client, error) { - return DefaultTransport.Dial(addr, opts...) -} - -func Listen(addr string, opts ...ListenOption) (Listener, error) { - return DefaultTransport.Listen(addr, opts...) -} - -func String() string { - return DefaultTransport.String() -} From 19fdfba0bf62912fb40ea8b1ae645eb14835adfc Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 19:24:17 +0100 Subject: [PATCH 7/8] move wrapper files --- client/{client_wrapper.go => wrapper.go} | 0 server/{server_wrapper.go => wrapper.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename client/{client_wrapper.go => wrapper.go} (100%) rename server/{server_wrapper.go => wrapper.go} (100%) diff --git a/client/client_wrapper.go b/client/wrapper.go similarity index 100% rename from client/client_wrapper.go rename to client/wrapper.go diff --git a/server/server_wrapper.go b/server/wrapper.go similarity index 100% rename from server/server_wrapper.go rename to server/wrapper.go From 0315b4480f5b49778e6506ebd9d0de1873ef3090 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 17 Apr 2018 11:00:22 +0100 Subject: [PATCH 8/8] revert some changes --- broker/broker.go | 24 +++++++++++ client/client.go | 32 ++++++++++++++ registry/registry.go | 29 +++++++++++++ server/server.go | 99 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 184 insertions(+) diff --git a/broker/broker.go b/broker/broker.go index d1dccaeb..39db9f17 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -44,3 +44,27 @@ var ( func NewBroker(opts ...Option) Broker { return newHttpBroker(opts...) } + +func Init(opts ...Option) error { + return DefaultBroker.Init(opts...) +} + +func Connect() error { + return DefaultBroker.Connect() +} + +func Disconnect() error { + return DefaultBroker.Disconnect() +} + +func Publish(topic string, msg *Message, opts ...PublishOption) error { + return DefaultBroker.Publish(topic, msg, opts...) +} + +func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + return DefaultBroker.Subscribe(topic, handler, opts...) +} + +func String() string { + return DefaultBroker.String() +} diff --git a/client/client.go b/client/client.go index f2297895..aa2178d3 100644 --- a/client/client.go +++ b/client/client.go @@ -76,7 +76,39 @@ var ( DefaultPoolTTL = time.Minute ) +// Makes a synchronous call to a service using the default client +func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { + return DefaultClient.Call(ctx, request, response, opts...) +} + +// Publishes a publication using the default client. Using the underlying broker +// set within the options. +func Publish(ctx context.Context, msg Message) error { + return DefaultClient.Publish(ctx, msg) +} + +// Creates a new message using the default client +func NewMessage(topic string, payload interface{}) Message { + return DefaultClient.NewMessage(topic, payload) +} + // Creates a new client with the options passed in func NewClient(opt ...Option) Client { return newRpcClient(opt...) } + +// Creates a new request using the default client. Content Type will +// be set to the default within options and use the appropriate codec +func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { + return DefaultClient.NewRequest(service, method, request, reqOpts...) +} + +// Creates a streaming connection with a service and returns responses on the +// channel passed in. It's up to the user to close the streamer. +func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) { + return DefaultClient.Stream(ctx, request, opts...) +} + +func String() string { + return DefaultClient.String() +} diff --git a/registry/registry.go b/registry/registry.go index 6225be38..489517f5 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -33,3 +33,32 @@ var ( func NewRegistry(opts ...Option) Registry { return newConsulRegistry(opts...) } + +// Register a service node. Additionally supply options such as TTL. +func Register(s *Service, opts ...RegisterOption) error { + return DefaultRegistry.Register(s, opts...) +} + +// Deregister a service node +func Deregister(s *Service) error { + return DefaultRegistry.Deregister(s) +} + +// Retrieve a service. A slice is returned since we separate Name/Version. +func GetService(name string) ([]*Service, error) { + return DefaultRegistry.GetService(name) +} + +// List the services. Only returns service names +func ListServices() ([]*Service, error) { + return DefaultRegistry.ListServices() +} + +// Watch returns a watcher which allows you to track updates to the registry. +func Watch(opts ...WatchOption) (Watcher, error) { + return DefaultRegistry.Watch(opts...) +} + +func String() string { + return DefaultRegistry.String() +} diff --git a/server/server.go b/server/server.go index 9b9f2ba4..a320fe8f 100644 --- a/server/server.go +++ b/server/server.go @@ -3,7 +3,11 @@ package server import ( "context" + "os" + "os/signal" + "syscall" + "github.com/micro/go-log" "github.com/pborman/uuid" ) @@ -63,7 +67,102 @@ var ( DefaultServer Server = newRpcServer() ) +// DefaultOptions returns config options for the default service +func DefaultOptions() Options { + return DefaultServer.Options() +} + +// Init initialises the default server with options passed in +func Init(opt ...Option) { + if DefaultServer == nil { + DefaultServer = newRpcServer(opt...) + } + DefaultServer.Init(opt...) +} + // NewServer returns a new server with options passed in func NewServer(opt ...Option) Server { return newRpcServer(opt...) } + +// NewSubscriber creates a new subscriber interface with the given topic +// and handler using the default server +func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber { + return DefaultServer.NewSubscriber(topic, h, opts...) +} + +// NewHandler creates a new handler interface using the default server +// Handlers are required to be a public object with public +// methods. Call to a service method such as Foo.Bar expects +// the type: +// +// type Foo struct {} +// func (f *Foo) Bar(ctx, req, rsp) error { +// return nil +// } +// +func NewHandler(h interface{}, opts ...HandlerOption) Handler { + return DefaultServer.NewHandler(h, opts...) +} + +// Handle registers a handler interface with the default server to +// handle inbound requests +func Handle(h Handler) error { + return DefaultServer.Handle(h) +} + +// Subscribe registers a subscriber interface with the default server +// which subscribes to specified topic with the broker +func Subscribe(s Subscriber) error { + return DefaultServer.Subscribe(s) +} + +// Register registers the default server with the discovery system +func Register() error { + return DefaultServer.Register() +} + +// Deregister deregisters the default server from the discovery system +func Deregister() error { + return DefaultServer.Deregister() +} + +// Run starts the default server and waits for a kill +// signal before exiting. Also registers/deregisters the server +func Run() error { + if err := Start(); err != nil { + return err + } + + if err := DefaultServer.Register(); err != nil { + return err + } + + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + log.Logf("Received signal %s", <-ch) + + if err := DefaultServer.Deregister(); err != nil { + return err + } + + return Stop() +} + +// Start starts the default server +func Start() error { + config := DefaultServer.Options() + log.Logf("Starting server %s id %s", config.Name, config.Id) + return DefaultServer.Start() +} + +// Stop stops the default server +func Stop() error { + log.Logf("Stopping server") + return DefaultServer.Stop() +} + +// String returns name of Server implementation +func String() string { + return DefaultServer.String() +}