From b1511ed8134c1fa197c230f2826f5d661aa2a8c0 Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 2 Dec 2015 11:54:36 +0000 Subject: [PATCH] Update wrapper and examples --- client/rpcplus_client.go | 10 ---- examples/client/main.go | 52 ----------------- examples/client/wrapper/wrapper.go | 91 ++++++++++++++++++++++++++++++ examples/server/main.go | 16 +++++- server/options.go | 32 +++++++---- server/rpcplus_server.go | 9 +-- server/server_wrapper.go | 6 +- 7 files changed, 130 insertions(+), 86 deletions(-) create mode 100644 examples/client/wrapper/wrapper.go diff --git a/client/rpcplus_client.go b/client/rpcplus_client.go index 730960e8..4fb2d400 100644 --- a/client/rpcplus_client.go +++ b/client/rpcplus_client.go @@ -56,14 +56,6 @@ type client struct { shutdown bool } -// A clientCodec implements writing of RPC requests and -// reading of RPC responses for the client side of an RPC session. -// The client calls WriteRequest to write a request to the connection -// and calls ReadResponseHeader and ReadResponseBody in pairs -// to read responses. The client calls Close when finished with the -// connection. ReadResponseBody may be called with a nil -// argument to force the body of the response to be read and then -// discarded. type clientCodec interface { WriteRequest(*request, interface{}) error ReadResponseHeader(*response) error @@ -224,8 +216,6 @@ func (call *call) done() { } } -// NewclientWithCodec is like Newclient but uses the specified -// codec to encode requests and decode responses. func newClientWithCodec(codec clientCodec) *client { client := &client{ codec: codec, diff --git a/examples/client/main.go b/examples/client/main.go index dbb43525..369553a8 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "time" "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" @@ -11,41 +10,6 @@ import ( "golang.org/x/net/context" ) -// wrapper example code - -// log wrapper logs every time a request is made -type logWrapper struct { - client.Client -} - -func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}) error { - md, _ := c.GetMetadata(ctx) - fmt.Printf("[Log Wrapper] ctx: %v service: %s method: %s\n", md, req.Service(), req.Method()) - return l.Client.Call(ctx, req, rsp) -} - -// trace wrapper attaches a unique trace ID - timestamp -type traceWrapper struct { - client.Client -} - -func (t *traceWrapper) Call(ctx context.Context, req client.Request, rsp interface{}) error { - ctx = c.WithMetadata(ctx, map[string]string{ - "X-Trace-Id": fmt.Sprintf("%d", time.Now().Unix()), - }) - return t.Client.Call(ctx, req, rsp) -} - -// Implements client.Wrapper as logWrapper -func logWrap(c client.Client) client.Client { - return &logWrapper{c} -} - -// Implements client.Wrapper as traceWrapper -func traceWrap(c client.Client) client.Client { - return &traceWrapper{c} -} - // publishes a message func pub() { msg := client.NewPublication("topic.go.micro.srv.example", &example.Message{ @@ -120,7 +84,6 @@ func stream() { func main() { cmd.Init() - fmt.Println("\n--- Call example ---\n") for i := 0; i < 10; i++ { call(i) @@ -131,19 +94,4 @@ func main() { fmt.Println("\n--- Publisher example ---\n") pub() - - fmt.Println("\n--- Wrapper example ---\n") - - // Wrap the default client - client.DefaultClient = logWrap(client.DefaultClient) - - call(0) - - // Wrap using client.Wrap option - client.DefaultClient = client.NewClient( - client.Wrap(traceWrap), - client.Wrap(logWrap), - ) - - call(1) } diff --git a/examples/client/wrapper/wrapper.go b/examples/client/wrapper/wrapper.go new file mode 100644 index 00000000..ac2709bb --- /dev/null +++ b/examples/client/wrapper/wrapper.go @@ -0,0 +1,91 @@ +package main + +import ( + "fmt" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + c "github.com/micro/go-micro/context" + example "github.com/micro/go-micro/examples/server/proto/example" + "golang.org/x/net/context" +) + +// wrapper example code + +// log wrapper logs every time a request is made +type logWrapper struct { + client.Client +} + +func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}) error { + md, _ := c.GetMetadata(ctx) + fmt.Printf("[Log Wrapper] ctx: %v service: %s method: %s\n", md, req.Service(), req.Method()) + return l.Client.Call(ctx, req, rsp) +} + +// trace wrapper attaches a unique trace ID - timestamp +type traceWrapper struct { + client.Client +} + +func (t *traceWrapper) Call(ctx context.Context, req client.Request, rsp interface{}) error { + ctx = c.WithMetadata(ctx, map[string]string{ + "X-Trace-Id": fmt.Sprintf("%d", time.Now().Unix()), + }) + return t.Client.Call(ctx, req, rsp) +} + +// Implements client.Wrapper as logWrapper +func logWrap(c client.Client) client.Client { + return &logWrapper{c} +} + +// Implements client.Wrapper as traceWrapper +func traceWrap(c client.Client) client.Client { + return &traceWrapper{c} +} + +func call(i int) { + // Create new request to service go.micro.srv.example, method Example.Call + req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ + Name: "John", + }) + + // create context with metadata + ctx := c.WithMetadata(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + rsp := &example.Response{} + + // Call service + if err := client.Call(ctx, req, rsp); err != nil { + fmt.Println("call err: ", err, rsp) + return + } + + fmt.Println("Call:", i, "rsp:", rsp.Msg) +} + +func main() { + cmd.Init() + + fmt.Println("\n--- Log Wrapper example ---\n") + + // Wrap the default client + client.DefaultClient = logWrap(client.DefaultClient) + + call(0) + + fmt.Println("\n--- Log+Trace Wrapper example ---\n") + + // Wrap using client.Wrap option + client.DefaultClient = client.NewClient( + client.Wrap(traceWrap), + client.Wrap(logWrap), + ) + + call(1) +} diff --git a/examples/server/main.go b/examples/server/main.go index 846b0241..409b68eb 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -6,15 +6,25 @@ import ( "github.com/micro/go-micro/examples/server/handler" "github.com/micro/go-micro/examples/server/subscriber" "github.com/micro/go-micro/server" + "golang.org/x/net/context" ) +func logWrapper(fn server.HandlerFunc) server.HandlerFunc { + return func(ctx context.Context, req interface{}, rsp interface{}) error { + log.Infof("[Log Wrapper] Before serving request") + err := fn(ctx, req, rsp) + log.Infof("[Log Wrapper] After serving request") + return err + } +} + func main() { // optionally setup command line usage cmd.Init() - // server.DefaultServer = server.NewServer( - // server.Codec("application/bson", bson.Codec), - // ) + server.DefaultServer = server.NewServer( + server.WrapHandler(logWrapper), + ) // Initialise Server server.Init( diff --git a/server/options.go b/server/options.go index ec4d281f..b06bd5d7 100644 --- a/server/options.go +++ b/server/options.go @@ -8,17 +8,18 @@ import ( ) type options struct { - codecs map[string]codec.NewCodec - broker broker.Broker - registry registry.Registry - transport transport.Transport - metadata map[string]string - name string - address string - advertise string - id string - version string - wrappers []Wrapper + codecs map[string]codec.NewCodec + broker broker.Broker + registry registry.Registry + transport transport.Transport + metadata map[string]string + name string + address string + advertise string + id string + version string + wrappers []HandlerWrapper + subWrappers []SubscriberWrapper } func newOptions(opt ...Option) options { @@ -156,8 +157,15 @@ func Metadata(md map[string]string) Option { } // Adds a handler Wrapper to a list of options passed into the server -func Wrap(w Wrapper) Option { +func WrapHandler(w HandlerWrapper) Option { return func(o *options) { o.wrappers = append(o.wrappers, w) } } + +// Adds a subscriber Wrapper to a list of options passed into the server +func WrapSubscriber(w SubscriberWrapper) Option { + return func(o *options) { + o.subWrappers = append(o.subWrappers, w) + } +} diff --git a/server/rpcplus_server.go b/server/rpcplus_server.go index e73c2ac7..19a05e1e 100644 --- a/server/rpcplus_server.go +++ b/server/rpcplus_server.go @@ -75,7 +75,7 @@ type server struct { freeReq *request respLock sync.Mutex // protects freeResp freeResp *response - wrappers []Wrapper + wrappers []HandlerWrapper } // Is this an exported - upper case - name? @@ -465,13 +465,6 @@ func (server *server) readRequestHeader(codec serverCodec) (service *service, mt return } -// A serverCodec implements reading of RPC requests and writing of -// RPC responses for the server side of an RPC session. -// The server calls ReadRequestHeader and ReadRequestBody in pairs -// to read requests from the connection, and it calls WriteResponse to -// write a response back. The server calls Close when finished with the -// connection. ReadRequestBody may be called with a nil -// argument to force the body of the request to be read and discarded. type serverCodec interface { ReadRequestHeader(*request) error ReadRequestBody(interface{}) error diff --git a/server/server_wrapper.go b/server/server_wrapper.go index 3657846d..835145b1 100644 --- a/server/server_wrapper.go +++ b/server/server_wrapper.go @@ -6,4 +6,8 @@ import ( type HandlerFunc func(ctx context.Context, req interface{}, rsp interface{}) error -type Wrapper func(HandlerFunc) HandlerFunc +type SubscriberFunc func(ctx context.Context, msg interface{}) error + +type HandlerWrapper func(HandlerFunc) HandlerFunc + +type SubscriberWrapper func(SubscriberFunc) SubscriberFunc