From 8dfd93e915b2f4d4c492f57e7bda585a746141ce Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Fri, 5 Jun 2020 10:18:35 +0100 Subject: [PATCH] util/wrapper: Add Static Client wrapper (#1685) * util/wrapper: Add Static Client wrapper * util/wrapper/static: pass address to stream too Signed-off-by: Vasiliy Tolstov * add static client wrapper tests Signed-off-by: Vasiliy Tolstov * server: fix error message spaces between words Signed-off-by: Vasiliy Tolstov * server/{rpc,grpc}: replace log.Error with log.Errorf * server/grpc: fix log typo * server/rpc: fix log typo Co-authored-by: Vasiliy Tolstov --- go.mod | 2 +- server/grpc/server.go | 16 ++--- server/rpc_router.go | 16 ++--- util/wrapper/wrapper.go | 18 ++++++ util/wrapper/wrapper_static_client_test.go | 72 ++++++++++++++++++++++ 5 files changed, 107 insertions(+), 17 deletions(-) create mode 100644 util/wrapper/wrapper_static_client_test.go diff --git a/go.mod b/go.mod index 48491e5c..dea31673 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1 google.golang.org/grpc v1.26.0 - google.golang.org/protobuf v1.22.0 // indirect + google.golang.org/protobuf v1.22.0 gopkg.in/telegram-bot-api.v4 v4.6.4 sigs.k8s.io/yaml v1.1.0 // indirect ) diff --git a/server/grpc/server.go b/server/grpc/server.go index 024a90f1..80abb711 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -87,7 +87,7 @@ func prepareEndpoint(method reflect.Method) *methodType { contextType = mtype.In(1) default: if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn()) + logger.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) } return nil } @@ -97,7 +97,7 @@ func prepareEndpoint(method reflect.Method) *methodType { streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() if !argType.Implements(streamType) { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(mname, "argument does not implement Streamer interface:", argType) + logger.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) } return nil } @@ -107,14 +107,14 @@ func prepareEndpoint(method reflect.Method) *methodType { // First arg need not be a pointer. if !isExportedOrBuiltinType(argType) { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(mname, "argument type not exported:", argType) + logger.Errorf("%v argument type not exported: %v", mname, argType) } return nil } if replyType.Kind() != reflect.Ptr { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error("method", mname, "reply type not a pointer:", replyType) + logger.Errorf("method %v reply type not a pointer: %v", mname, replyType) } return nil } @@ -122,7 +122,7 @@ func prepareEndpoint(method reflect.Method) *methodType { // Reply type must be exported. if !isExportedOrBuiltinType(replyType) { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error("method", mname, "reply type not exported:", replyType) + logger.Errorf("method %v reply type not exported: %v", mname, replyType) } return nil } @@ -131,14 +131,14 @@ func prepareEndpoint(method reflect.Method) *methodType { // Endpoint() needs one out. if mtype.NumOut() != 1 { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error("method", mname, "has wrong number of outs:", mtype.NumOut()) + logger.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut()) } return nil } // The return type of the method must be error. if returnType := mtype.Out(0); returnType != typeOfError { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error("method", mname, "returns", returnType.String(), "not error") + logger.Errorf("method %v returns %v not error", mname, returnType.String()) } return nil } @@ -156,7 +156,7 @@ func (server *rServer) register(rcvr interface{}) error { s.rcvr = reflect.ValueOf(rcvr) sname := reflect.Indirect(s.rcvr).Type().Name() if sname == "" { - logger.Fatal("rpc: no service name for type", s.typ.String()) + logger.Fatalf("rpc: no service name for type %v", s.typ.String()) } if !isExported(sname) { s := "rpc Register: type " + sname + " is not exported" diff --git a/server/rpc_router.go b/server/rpc_router.go index cd5d64df..749a6591 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -140,7 +140,7 @@ func prepareMethod(method reflect.Method) *methodType { replyType = mtype.In(3) contextType = mtype.In(1) default: - log.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn()) + log.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) return nil } @@ -148,7 +148,7 @@ func prepareMethod(method reflect.Method) *methodType { // check stream type streamType := reflect.TypeOf((*Stream)(nil)).Elem() if !argType.Implements(streamType) { - log.Error(mname, "argument does not implement Stream interface:", argType) + log.Errorf("%v argument does not implement Stream interface: %v", mname, argType) return nil } } else { @@ -156,30 +156,30 @@ func prepareMethod(method reflect.Method) *methodType { // First arg need not be a pointer. if !isExportedOrBuiltinType(argType) { - log.Error(mname, "argument type not exported:", argType) + log.Errorf("%v argument type not exported: %v", mname, argType) return nil } if replyType.Kind() != reflect.Ptr { - log.Error("method", mname, "reply type not a pointer:", replyType) + log.Errorf("method %v reply type not a pointer: %v", mname, replyType) return nil } // Reply type must be exported. if !isExportedOrBuiltinType(replyType) { - log.Error("method", mname, "reply type not exported:", replyType) + log.Errorf("method %v reply type not exported: %v", mname, replyType) return nil } } // Method needs one out. if mtype.NumOut() != 1 { - log.Error("method", mname, "has wrong number of outs:", mtype.NumOut()) + log.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut()) return nil } // The return type of the method must be error. if returnType := mtype.Out(0); returnType != typeOfError { - log.Error("method", mname, "returns", returnType.String(), "not error") + log.Errorf("method %v returns %v not error", mname, returnType.String()) return nil } return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} @@ -508,7 +508,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro defer func() { // recover any panics if r := recover(); r != nil { - log.Error("panic recovered: ", r) + log.Errorf("panic recovered: %v", r) log.Error(string(debug.Stack())) err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r) } diff --git a/util/wrapper/wrapper.go b/util/wrapper/wrapper.go index 5d3d6432..e3af20a0 100644 --- a/util/wrapper/wrapper.go +++ b/util/wrapper/wrapper.go @@ -292,3 +292,21 @@ func (c *cacheWrapper) Call(ctx context.Context, req client.Request, rsp interfa func CacheClient(cacheFn func() *client.Cache, c client.Client) client.Client { return &cacheWrapper{cacheFn, c} } + +type staticClient struct { + address string + client.Client +} + +func (s *staticClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + return s.Client.Call(ctx, req, rsp, append(opts, client.WithAddress(s.address))...) +} + +func (s *staticClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + return s.Client.Stream(ctx, req, append(opts, client.WithAddress(s.address))...) +} + +// StaticClient sets an address on every call +func StaticClient(address string, c client.Client) client.Client { + return &staticClient{address, c} +} diff --git a/util/wrapper/wrapper_static_client_test.go b/util/wrapper/wrapper_static_client_test.go new file mode 100644 index 00000000..6ccd58ef --- /dev/null +++ b/util/wrapper/wrapper_static_client_test.go @@ -0,0 +1,72 @@ +package wrapper_test + +import ( + "context" + "testing" + + "github.com/micro/go-micro/v2/broker" + bmemory "github.com/micro/go-micro/v2/broker/memory" + "github.com/micro/go-micro/v2/client" + rmemory "github.com/micro/go-micro/v2/registry/memory" + "github.com/micro/go-micro/v2/server" + tmemory "github.com/micro/go-micro/v2/transport/memory" + wrapper "github.com/micro/go-micro/v2/util/wrapper" +) + +type TestFoo struct { +} + +type TestReq struct{} + +type TestRsp struct { + Data string +} + +func (h *TestFoo) Bar(ctx context.Context, req *TestReq, rsp *TestRsp) error { + rsp.Data = "pass" + return nil +} + +func TestStaticClientWrapper(t *testing.T) { + var err error + + req := client.NewRequest("go.micro.service.foo", "TestFoo.Bar", &TestReq{}, client.WithContentType("application/json")) + rsp := &TestRsp{} + + reg := rmemory.NewRegistry() + brk := bmemory.NewBroker(broker.Registry(reg)) + tr := tmemory.NewTransport() + + srv := server.NewServer( + server.Broker(brk), + server.Registry(reg), + server.Name("go.micro.service.foo"), + server.Address("127.0.0.1:0"), + server.Transport(tr), + ) + if err = srv.Handle(srv.NewHandler(&TestFoo{})); err != nil { + t.Fatal(err) + } + + if err = srv.Start(); err != nil { + t.Fatal(err) + } + + cli := client.NewClient( + client.Registry(reg), + client.Broker(brk), + client.Transport(tr), + ) + + w1 := wrapper.StaticClient("xxx_localhost:12345", cli) + if err = w1.Call(context.TODO(), req, nil); err == nil { + t.Fatal("address xxx_#localhost:12345 must not exists and call must be failed") + } + + w2 := wrapper.StaticClient(srv.Options().Address, cli) + if err = w2.Call(context.TODO(), req, rsp); err != nil { + t.Fatal(err) + } else if rsp.Data != "pass" { + t.Fatalf("something wrong with response: %#+v", rsp) + } +}