diff --git a/client/client_wrapper.go b/client/client_wrapper.go index 382747a3..099f90be 100644 --- a/client/client_wrapper.go +++ b/client/client_wrapper.go @@ -34,8 +34,24 @@ Example usage: */ +import ( + "golang.org/x/net/context" +) + +// CallFunc represents the individual call func +type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error + +// StreamFunc represents the individual stream func +type StreamFunc func(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) + // Wrapper wraps a client and returns a client type Wrapper func(Client) Client // StreamWrapper wraps a Stream and returns the equivalent type StreamWrapper func(Streamer) Streamer + +// CallFuncWrapper is a low level wrapper for the CallFunc +type CallFuncWrapper func(CallFunc) CallFunc + +// StreamFuncWrapper is a low level wrapper for the StreamFunc +type StreamFuncWrapper func(StreamFunc) StreamFunc diff --git a/client/options.go b/client/options.go index 0f53c8db..dd3db1ed 100644 --- a/client/options.go +++ b/client/options.go @@ -52,6 +52,9 @@ type CallOptions struct { // Request/Response timeout RequestTimeout time.Duration + // Middleware for low level call func + CallWrappers []CallFuncWrapper + // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -177,6 +180,13 @@ func Wrap(w Wrapper) Option { } } +// Adds a Wrapper to the list of CallFunc wrappers +func WrapCallFunc(cw ...CallFuncWrapper) Option { + return func(o *Options) { + o.CallOptions.CallWrappers = append(o.CallOptions.CallWrappers, cw...) + } +} + // Backoff is used to set the backoff function used // when retrying Calls func Backoff(fn BackoffFunc) Option { @@ -216,6 +226,13 @@ func WithSelectOption(so ...selector.SelectOption) CallOption { } } +// WithCallFuncWrapper is a CallOption which adds to the existing CallFunc wrappers +func WithCallFuncWrapper(cw ...CallFuncWrapper) CallOption { + return func(o *CallOptions) { + o.CallWrappers = append(o.CallWrappers, cw...) + } +} + // WithBackoff is a CallOption which overrides that which // set in Options.CallOptions func WithBackoff(fn BackoffFunc) CallOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 861ddc53..ee40728f 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -277,8 +277,14 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac address = fmt.Sprintf("%s:%d", address, node.Port) } + // wrap the call in reverse + rcall := r.call + for i := len(callOpts.CallWrappers); i > 0; i-- { + rcall = callOpts.CallWrappers[i-1](rcall) + } + // make the call - err = r.call(ctx, address, request, response, callOpts) + err = rcall(ctx, address, request, response, callOpts) r.opts.Selector.Mark(request.Service(), node, err) return err }