From 02985c35d5438f16a0462825a54dd64cf687b859 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 8 Dec 2015 19:25:42 +0000 Subject: [PATCH] Add call and publish options --- client/client.go | 28 +++++++++++++++------------- client/options.go | 4 ++++ client/rpc_client.go | 11 +++++------ examples/client/wrapper/wrapper.go | 4 ++-- 4 files changed, 26 insertions(+), 21 deletions(-) diff --git a/client/client.go b/client/client.go index 03b79342..ad117412 100644 --- a/client/client.go +++ b/client/client.go @@ -30,11 +30,11 @@ type Client interface { NewRequest(service, method string, req interface{}) Request NewProtoRequest(service, method string, req interface{}) Request NewJsonRequest(service, method string, req interface{}) Request - Call(ctx context.Context, req Request, rsp interface{}) error - CallRemote(ctx context.Context, addr string, req Request, rsp interface{}) error - Stream(ctx context.Context, req Request, rspChan interface{}) (Streamer, error) - StreamRemote(ctx context.Context, addr string, req Request, rspChan interface{}) (Streamer, error) - Publish(ctx context.Context, p Publication) error + Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error + CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error + Stream(ctx context.Context, req Request, rspChan interface{}, opts ...CallOption) (Streamer, error) + StreamRemote(ctx context.Context, addr string, req Request, rspChan interface{}, opts ...CallOption) (Streamer, error) + Publish(ctx context.Context, p Publication, opts ...PublishOption) error } type Publication interface { @@ -57,30 +57,32 @@ type Streamer interface { } type Option func(*options) +type CallOption func(*callOptions) +type PublishOption func(*publishOptions) var ( DefaultClient Client = newRpcClient() ) // Makes a synchronous call to a service using the default client -func Call(ctx context.Context, request Request, response interface{}) error { - return DefaultClient.Call(ctx, request, response) +func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { + return DefaultClient.Call(ctx, request, response, opts...) } // Makes a synchronous call to the specified address using the default client -func CallRemote(ctx context.Context, address string, request Request, response interface{}) error { - return DefaultClient.CallRemote(ctx, address, request, response) +func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { + return DefaultClient.CallRemote(ctx, address, request, response, opts...) } // Creates a streaming connection with a service and returns responses on the // channel passed in. It's upto the user to close the streamer. -func Stream(ctx context.Context, request Request, responseChan interface{}) (Streamer, error) { - return DefaultClient.Stream(ctx, request, responseChan) +func Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { + return DefaultClient.Stream(ctx, request, responseChan, opts...) } // Creates a streaming connection to the address specified. -func StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}) (Streamer, error) { - return DefaultClient.StreamRemote(ctx, address, request, responseChan) +func StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { + return DefaultClient.StreamRemote(ctx, address, request, responseChan, opts...) } // Publishes a publication using the default client. Using the underlying broker diff --git a/client/options.go b/client/options.go index cf70ea89..f88d34e4 100644 --- a/client/options.go +++ b/client/options.go @@ -17,6 +17,10 @@ type options struct { selector Selector } +type callOptions struct{} + +type publishOptions struct{} + // Broker to be used for pub/sub func Broker(b broker.Broker) Option { return func(o *options) { diff --git a/client/rpc_client.go b/client/rpc_client.go index 0f07a7de..856077be 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -148,12 +148,11 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, }, nil } -func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}) error { +func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error { return r.call(ctx, address, request, response) } -// TODO: Call(..., opts *Options) error { -func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}) error { +func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { node, err := r.sel.Select(ctx, request) if err != nil { return err @@ -169,11 +168,11 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return err } -func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}) (Streamer, error) { +func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { return r.stream(ctx, address, request, responseChan) } -func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}) (Streamer, error) { +func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { node, err := r.sel.Select(ctx, request) if err != nil { return nil, err @@ -189,7 +188,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in return stream, err } -func (r *rpcClient) Publish(ctx context.Context, p Publication) error { +func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error { md, ok := c.GetMetadata(ctx) if !ok { md = make(map[string]string) diff --git a/examples/client/wrapper/wrapper.go b/examples/client/wrapper/wrapper.go index ac2709bb..ca35772e 100644 --- a/examples/client/wrapper/wrapper.go +++ b/examples/client/wrapper/wrapper.go @@ -18,7 +18,7 @@ type logWrapper struct { client.Client } -func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}) error { +func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { md, _ := c.GetMetadata(ctx) fmt.Printf("[Log Wrapper] ctx: %v service: %s method: %s\n", md, req.Service(), req.Method()) return l.Client.Call(ctx, req, rsp) @@ -29,7 +29,7 @@ type traceWrapper struct { client.Client } -func (t *traceWrapper) Call(ctx context.Context, req client.Request, rsp interface{}) error { +func (t *traceWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { ctx = c.WithMetadata(ctx, map[string]string{ "X-Trace-Id": fmt.Sprintf("%d", time.Now().Unix()), })