Rework client interface

This commit is contained in:
Asim Aslam 2018-04-14 18:06:52 +01:00
parent 07068379c6
commit c2cfe5310c
10 changed files with 87 additions and 127 deletions

View File

@ -11,7 +11,7 @@ func TestBackoff(t *testing.T) {
delta := time.Duration(0) delta := time.Duration(0)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i) d, err := exponentialBackoff(context.TODO(), NewRequest("test", "test", nil), i)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -12,20 +12,18 @@ import (
type Client interface { type Client interface {
Init(...Option) error Init(...Option) error
Options() Options Options() Options
NewPublication(topic string, msg interface{}) Publication NewMessage(topic string, msg interface{}) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string String() string
} }
// Publication is the interface for a message published asynchronously // Message is the interface for publishing asynchronously
type Publication interface { type Message interface {
Topic() string Topic() string
Message() interface{} Payload() interface{}
ContentType() string ContentType() string
} }
@ -91,8 +89,8 @@ func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer,
// Publishes a publication using the default client. Using the underlying broker // Publishes a publication using the default client. Using the underlying broker
// set within the options. // set within the options.
func Publish(ctx context.Context, p Publication) error { func Publish(ctx context.Context, msg Message) error {
return DefaultClient.Publish(ctx, p) return DefaultClient.Publish(ctx, msg)
} }
// Creates a new client with the options passed in // Creates a new client with the options passed in
@ -100,9 +98,9 @@ func NewClient(opt ...Option) Client {
return newRpcClient(opt...) return newRpcClient(opt...)
} }
// Creates a new publication using the default client // Creates a new message using the default client
func NewPublication(topic string, message interface{}) Publication { func NewMessage(topic string, message interface{}) Message {
return DefaultClient.NewPublication(topic, message) return DefaultClient.NewMessage(topic, message)
} }
// Creates a new request using the default client. Content Type will // Creates a new request using the default client. Content Type will
@ -110,17 +108,3 @@ func NewPublication(topic string, message interface{}) Publication {
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, method, request, reqOpts...) return DefaultClient.NewRequest(service, method, request, reqOpts...)
} }
// Creates a new protobuf request using the default client
func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewProtoRequest(service, method, request, reqOpts...)
}
// Creates a new json request using the default client
func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewJsonRequest(service, method, request, reqOpts...)
}
func String() string {
return DefaultClient.String()
}

View File

@ -49,22 +49,14 @@ func (m *MockClient) Options() client.Options {
return m.Opts return m.Opts
} }
func (m *MockClient) NewPublication(topic string, msg interface{}) client.Publication { func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message {
return m.Client.NewPublication(topic, msg) return m.Client.NewMessage(topic, msg)
} }
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewRequest(service, method, req, reqOpts...) return m.Client.NewRequest(service, method, req, reqOpts...)
} }
func (m *MockClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewProtoRequest(service, method, req, reqOpts...)
}
func (m *MockClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewJsonRequest(service, method, req, reqOpts...)
}
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@ -97,38 +89,6 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
return fmt.Errorf("rpc: can't find service %s", req.Method()) return fmt.Errorf("rpc: can't find service %s", req.Method())
} }
func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error {
m.Lock()
defer m.Unlock()
response, ok := m.Response[req.Service()]
if !ok {
return errors.NotFound("go.micro.client.mock", "service not found")
}
for _, r := range response {
if r.Method != req.Method() {
continue
}
if r.Error != nil {
return r.Error
}
v := reflect.ValueOf(rsp)
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
v.Set(reflect.ValueOf(r.Response))
return nil
}
return fmt.Errorf("rpc: can't find service %s", req.Method())
}
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@ -137,15 +97,7 @@ func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...cli
return nil, nil return nil, nil
} }
func (m *MockClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) { func (m *MockClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
m.Lock()
defer m.Unlock()
// TODO: mock stream
return nil, nil
}
func (m *MockClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
return nil return nil
} }

View File

@ -21,7 +21,7 @@ func TestClient(t *testing.T) {
c := NewClient(Response("go.mock", response)) c := NewClient(Response("go.mock", response))
for _, r := range response { for _, r := range response {
req := c.NewJsonRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"}) req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
var rsp interface{} var rsp interface{}
err := c.Call(context.TODO(), req, &rsp) err := c.Call(context.TODO(), req, &rsp)

View File

@ -68,6 +68,7 @@ type PublishOptions struct {
} }
type RequestOptions struct { type RequestOptions struct {
ContentType string
Stream bool Stream bool
// Other options for implementations of the interface // Other options for implementations of the interface
@ -290,6 +291,12 @@ func WithDialTimeout(d time.Duration) CallOption {
// Request Options // Request Options
func WithContentType(ct string) RequestOption {
return func(o *RequestOptions) {
o.ContentType = ct
}
}
func StreamingRequest() RequestOption { func StreamingRequest() RequestOption {
return func(o *RequestOptions) { return func(o *RequestOptions) {
o.Stream = true o.Stream = true

View File

@ -439,49 +439,38 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, grr return nil, grr
} }
func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error { func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
md, ok := metadata.FromContext(ctx) md, ok := metadata.FromContext(ctx)
if !ok { if !ok {
md = make(map[string]string) md = make(map[string]string)
} }
md["Content-Type"] = p.ContentType() md["Content-Type"] = msg.ContentType()
// encode message body // encode message body
cf, err := r.newCodec(p.ContentType()) cf, err := r.newCodec(msg.ContentType())
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
b := &buffer{bytes.NewBuffer(nil)} b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
r.once.Do(func() { r.once.Do(func() {
r.opts.Broker.Connect() r.opts.Broker.Connect()
}) })
return r.opts.Broker.Publish(p.Topic(), &broker.Message{ return r.opts.Broker.Publish(msg.Topic(), &broker.Message{
Header: md, Header: md,
Body: b.Bytes(), Body: b.Bytes(),
}) })
} }
func (r *rpcClient) NewPublication(topic string, message interface{}) Publication { func (r *rpcClient) NewMessage(topic string, message interface{}) Message {
return newRpcPublication(topic, message, r.opts.ContentType) return newMessage(topic, message, r.opts.ContentType)
} }
func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream")
}
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...) return newRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...)
}
func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/json", reqOpts...)
} }
func (r *rpcClient) String() string { func (r *rpcClient) String() string {

27
client/rpc_message.go Normal file
View File

@ -0,0 +1,27 @@
package client
type message struct {
topic string
contentType string
payload interface{}
}
func newMessage(topic string, payload interface{}, contentType string) Message {
return &message{
payload: payload,
topic: topic,
contentType: contentType,
}
}
func (m *message) ContentType() string {
return m.contentType
}
func (m *message) Topic() string {
return m.topic
}
func (m *message) Payload() interface{} {
return m.payload
}

View File

@ -1,27 +0,0 @@
package client
type rpcPublication struct {
topic string
contentType string
message interface{}
}
func newRpcPublication(topic string, message interface{}, contentType string) Publication {
return &rpcPublication{
message: message,
topic: topic,
contentType: contentType,
}
}
func (r *rpcPublication) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
}

View File

@ -8,13 +8,18 @@ type rpcRequest struct {
opts RequestOptions opts RequestOptions
} }
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions var opts RequestOptions
for _, o := range reqOpts { for _, o := range reqOpts {
o(&opts) o(&opts)
} }
// set the content-type specified
if len(opts.ContentType) > 0 {
contentType = opts.ContentType
}
return &rpcRequest{ return &rpcRequest{
service: service, service: service,
method: method, method: method,

View File

@ -0,0 +1,23 @@
package client
import (
"testing"
)
func TestRequestOptions(t *testing.T) {
r := newRequest("service", "method", nil, "application/json")
if r.Service() != "service" {
t.Fatalf("expected 'service' got %s", r.Service())
}
if r.Method() != "method" {
t.Fatalf("expected 'method' got %s", r.Method())
}
if r.ContentType() != "application/json" {
t.Fatalf("expected 'method' got %s", r.ContentType())
}
r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf"))
if r2.ContentType() != "application/protobuf" {
t.Fatalf("expected 'method' got %s", r2.ContentType())
}
}