Merge pull request #136 from micro/wrapper

Call Wrapper
This commit is contained in:
Asim Aslam 2016-11-09 18:12:36 +00:00 committed by GitHub
commit 74d8ea6817
5 changed files with 125 additions and 1 deletions

View File

@ -34,6 +34,16 @@ Example usage:
*/ */
import (
"golang.org/x/net/context"
)
// CallFunc represents the individual call func
type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error
// CallWrapper is a low level wrapper for the CallFunc
type CallWrapper func(CallFunc) CallFunc
// Wrapper wraps a client and returns a client // Wrapper wraps a client and returns a client
type Wrapper func(Client) Client type Wrapper func(Client) Client

View File

@ -52,6 +52,9 @@ type CallOptions struct {
// Request/Response timeout // Request/Response timeout
RequestTimeout time.Duration RequestTimeout time.Duration
// Middleware for low level call func
CallWrappers []CallWrapper
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
Context context.Context Context context.Context
@ -177,6 +180,13 @@ func Wrap(w Wrapper) Option {
} }
} }
// Adds a Wrapper to the list of CallFunc wrappers
func WrapCall(cw ...CallWrapper) Option {
return func(o *Options) {
o.CallOptions.CallWrappers = append(o.CallOptions.CallWrappers, cw...)
}
}
// Backoff is used to set the backoff function used // Backoff is used to set the backoff function used
// when retrying Calls // when retrying Calls
func Backoff(fn BackoffFunc) Option { func Backoff(fn BackoffFunc) Option {
@ -216,6 +226,13 @@ func WithSelectOption(so ...selector.SelectOption) CallOption {
} }
} }
// WithCallWrapper is a CallOption which adds to the existing CallFunc wrappers
func WithCallWrapper(cw ...CallWrapper) CallOption {
return func(o *CallOptions) {
o.CallWrappers = append(o.CallWrappers, cw...)
}
}
// WithBackoff is a CallOption which overrides that which // WithBackoff is a CallOption which overrides that which
// set in Options.CallOptions // set in Options.CallOptions
func WithBackoff(fn BackoffFunc) CallOption { func WithBackoff(fn BackoffFunc) CallOption {

View File

@ -250,6 +250,14 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
default: default:
} }
// make copy of call method
rcall := r.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
rcall = callOpts.CallWrappers[i-1](rcall)
}
// return errors.New("go.micro.client", "request timeout", 408) // return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error { call := func(i int) error {
// call backoff first. Someone may want an initial start delay // call backoff first. Someone may want an initial start delay
@ -278,7 +286,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
} }
// make the call // make the call
err = r.call(ctx, address, request, response, callOpts) err = rcall(ctx, address, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err) r.opts.Selector.Mark(request.Service(), node, err)
return err return err
} }

71
client/rpc_client_test.go Normal file
View File

@ -0,0 +1,71 @@
package client
import (
"fmt"
"testing"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
func TestCallWrapper(t *testing.T) {
var called bool
id := "test.1"
service := "test.service"
method := "Test.Method"
host := "10.1.10.1"
port := 8080
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
return fmt.Errorf("expected service: %s got %s", service, req.Service())
}
if req.Method() != method {
return fmt.Errorf("expected service: %s got %s", method, req.Method())
}
if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
}
// don't do the call
return nil
}
}
r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
r.Register(&registry.Service{
Name: service,
Version: "latest",
Nodes: []*registry.Node{
&registry.Node{
Id: id,
Address: host,
Port: port,
},
},
})
req := c.NewRequest(service, method, nil)
if err := c.Call(context.Background(), req, nil); err != nil {
t.Fatal("call wrapper error", err)
}
if !called {
t.Fatal("wrapper not called")
}
}

View File

@ -46,6 +46,15 @@ func traceWrap(c client.Client) client.Client {
return &traceWrapper{c} return &traceWrapper{c}
} }
func metricsWrap(cf client.CallFunc) client.CallFunc {
return func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
t := time.Now()
err := cf(ctx, addr, req, rsp, opts)
fmt.Printf("[Metrics Wrapper] called: %s %s.%s duration: %v\n", addr, req.Service(), req.Method(), time.Since(t))
return err
}
}
func call(i int) { func call(i int) {
// Create new request to service go.micro.srv.example, method Example.Call // Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
@ -88,4 +97,13 @@ func main() {
) )
call(1) call(1)
fmt.Println("\n--- Metrics Wrapper example ---\n")
// Wrap using client.Wrap option
client.DefaultClient = client.NewClient(
client.WrapCall(metricsWrap),
)
call(2)
} }