diff --git a/client/backoff_test.go b/client/backoff_test.go index 9b781491..11ba3980 100644 --- a/client/backoff_test.go +++ b/client/backoff_test.go @@ -11,7 +11,7 @@ func TestBackoff(t *testing.T) { delta := time.Duration(0) for i := 0; i < 5; i++ { - d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i) + d, err := exponentialBackoff(context.TODO(), NewRequest("test", "test", nil), i) if err != nil { t.Fatal(err) } diff --git a/client/client.go b/client/client.go index 4022d249..4d379ede 100644 --- a/client/client.go +++ b/client/client.go @@ -12,20 +12,18 @@ import ( type Client interface { Init(...Option) error Options() Options - NewPublication(topic string, msg interface{}) Publication + NewMessage(topic string, msg interface{}) Message NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request - NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request - NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) - Publish(ctx context.Context, p Publication, opts ...PublishOption) error + Publish(ctx context.Context, msg Message, opts ...PublishOption) error String() string } -// Publication is the interface for a message published asynchronously -type Publication interface { +// Message is the interface for publishing asynchronously +type Message interface { Topic() string - Message() interface{} + Payload() interface{} ContentType() string } @@ -91,8 +89,8 @@ func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, // Publishes a publication using the default client. Using the underlying broker // set within the options. -func Publish(ctx context.Context, p Publication) error { - return DefaultClient.Publish(ctx, p) +func Publish(ctx context.Context, msg Message) error { + return DefaultClient.Publish(ctx, msg) } // Creates a new client with the options passed in @@ -100,9 +98,9 @@ func NewClient(opt ...Option) Client { return newRpcClient(opt...) } -// Creates a new publication using the default client -func NewPublication(topic string, message interface{}) Publication { - return DefaultClient.NewPublication(topic, message) +// Creates a new message using the default client +func NewMessage(topic string, message interface{}) Message { + return DefaultClient.NewMessage(topic, message) } // Creates a new request using the default client. Content Type will @@ -110,17 +108,3 @@ func NewPublication(topic string, message interface{}) Publication { func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { return DefaultClient.NewRequest(service, method, request, reqOpts...) } - -// Creates a new protobuf request using the default client -func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return DefaultClient.NewProtoRequest(service, method, request, reqOpts...) -} - -// Creates a new json request using the default client -func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return DefaultClient.NewJsonRequest(service, method, request, reqOpts...) -} - -func String() string { - return DefaultClient.String() -} diff --git a/client/mock/mock.go b/client/mock/mock.go index c22dfad8..9182f3ba 100644 --- a/client/mock/mock.go +++ b/client/mock/mock.go @@ -49,22 +49,14 @@ func (m *MockClient) Options() client.Options { return m.Opts } -func (m *MockClient) NewPublication(topic string, msg interface{}) client.Publication { - return m.Client.NewPublication(topic, msg) +func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message { + return m.Client.NewMessage(topic, msg) } func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { return m.Client.NewRequest(service, method, req, reqOpts...) } -func (m *MockClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { - return m.Client.NewProtoRequest(service, method, req, reqOpts...) -} - -func (m *MockClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { - return m.Client.NewJsonRequest(service, method, req, reqOpts...) -} - func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { m.Lock() defer m.Unlock() @@ -97,38 +89,6 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface return fmt.Errorf("rpc: can't find service %s", req.Method()) } -func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error { - m.Lock() - defer m.Unlock() - - response, ok := m.Response[req.Service()] - if !ok { - return errors.NotFound("go.micro.client.mock", "service not found") - } - - for _, r := range response { - if r.Method != req.Method() { - continue - } - - if r.Error != nil { - return r.Error - } - - v := reflect.ValueOf(rsp) - - if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr { - v = reflect.Indirect(v) - } - - v.Set(reflect.ValueOf(r.Response)) - - return nil - } - - return fmt.Errorf("rpc: can't find service %s", req.Method()) -} - func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { m.Lock() defer m.Unlock() @@ -137,15 +97,7 @@ func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...cli return nil, nil } -func (m *MockClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) { - m.Lock() - defer m.Unlock() - - // TODO: mock stream - return nil, nil -} - -func (m *MockClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error { +func (m *MockClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { return nil } diff --git a/client/mock/mock_test.go b/client/mock/mock_test.go index 204b0b96..a13cef13 100644 --- a/client/mock/mock_test.go +++ b/client/mock/mock_test.go @@ -21,7 +21,7 @@ func TestClient(t *testing.T) { c := NewClient(Response("go.mock", response)) for _, r := range response { - req := c.NewJsonRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"}) + req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"}) var rsp interface{} err := c.Call(context.TODO(), req, &rsp) diff --git a/client/options.go b/client/options.go index 5d7a0965..2d465850 100644 --- a/client/options.go +++ b/client/options.go @@ -68,7 +68,8 @@ type PublishOptions struct { } type RequestOptions struct { - Stream bool + ContentType string + Stream bool // Other options for implementations of the interface // can be stored in a context @@ -290,6 +291,12 @@ func WithDialTimeout(d time.Duration) CallOption { // Request Options +func WithContentType(ct string) RequestOption { + return func(o *RequestOptions) { + o.ContentType = ct + } +} + func StreamingRequest() RequestOption { return func(o *RequestOptions) { o.Stream = true diff --git a/client/rpc_client.go b/client/rpc_client.go index ad244731..82781d14 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -439,49 +439,38 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt return nil, grr } -func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error { +func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error { md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) } - md["Content-Type"] = p.ContentType() + md["Content-Type"] = msg.ContentType() // encode message body - cf, err := r.newCodec(p.ContentType()) + cf, err := r.newCodec(msg.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } b := &buffer{bytes.NewBuffer(nil)} - if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { + if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } r.once.Do(func() { r.opts.Broker.Connect() }) - return r.opts.Broker.Publish(p.Topic(), &broker.Message{ + return r.opts.Broker.Publish(msg.Topic(), &broker.Message{ Header: md, Body: b.Bytes(), }) } -func (r *rpcClient) NewPublication(topic string, message interface{}) Publication { - return newRpcPublication(topic, message, r.opts.ContentType) +func (r *rpcClient) NewMessage(topic string, message interface{}) Message { + return newMessage(topic, message, r.opts.ContentType) } -func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication { - return newRpcPublication(topic, message, "application/octet-stream") -} func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...) -} - -func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...) -} - -func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return newRpcRequest(service, method, request, "application/json", reqOpts...) + return newRequest(service, method, request, r.opts.ContentType, reqOpts...) } func (r *rpcClient) String() string { diff --git a/client/rpc_message.go b/client/rpc_message.go new file mode 100644 index 00000000..13bd15c7 --- /dev/null +++ b/client/rpc_message.go @@ -0,0 +1,27 @@ +package client + +type message struct { + topic string + contentType string + payload interface{} +} + +func newMessage(topic string, payload interface{}, contentType string) Message { + return &message{ + payload: payload, + topic: topic, + contentType: contentType, + } +} + +func (m *message) ContentType() string { + return m.contentType +} + +func (m *message) Topic() string { + return m.topic +} + +func (m *message) Payload() interface{} { + return m.payload +} diff --git a/client/rpc_publication.go b/client/rpc_publication.go deleted file mode 100644 index 01a0073e..00000000 --- a/client/rpc_publication.go +++ /dev/null @@ -1,27 +0,0 @@ -package client - -type rpcPublication struct { - topic string - contentType string - message interface{} -} - -func newRpcPublication(topic string, message interface{}, contentType string) Publication { - return &rpcPublication{ - message: message, - topic: topic, - contentType: contentType, - } -} - -func (r *rpcPublication) ContentType() string { - return r.contentType -} - -func (r *rpcPublication) Topic() string { - return r.topic -} - -func (r *rpcPublication) Message() interface{} { - return r.message -} diff --git a/client/rpc_request.go b/client/rpc_request.go index ec50a76a..04ce490a 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -8,13 +8,18 @@ type rpcRequest struct { opts RequestOptions } -func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { +func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { var opts RequestOptions for _, o := range reqOpts { o(&opts) } + // set the content-type specified + if len(opts.ContentType) > 0 { + contentType = opts.ContentType + } + return &rpcRequest{ service: service, method: method, diff --git a/client/rpc_request_test.go b/client/rpc_request_test.go new file mode 100644 index 00000000..bca3dd2f --- /dev/null +++ b/client/rpc_request_test.go @@ -0,0 +1,23 @@ +package client + +import ( + "testing" +) + +func TestRequestOptions(t *testing.T) { + r := newRequest("service", "method", nil, "application/json") + if r.Service() != "service" { + t.Fatalf("expected 'service' got %s", r.Service()) + } + if r.Method() != "method" { + t.Fatalf("expected 'method' got %s", r.Method()) + } + if r.ContentType() != "application/json" { + t.Fatalf("expected 'method' got %s", r.ContentType()) + } + + r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf")) + if r2.ContentType() != "application/protobuf" { + t.Fatalf("expected 'method' got %s", r2.ContentType()) + } +}