util/wrapper: Add Static Client wrapper (#1685)
* util/wrapper: Add Static Client wrapper * util/wrapper/static: pass address to stream too Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * add static client wrapper tests Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * server: fix error message spaces between words Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * server/{rpc,grpc}: replace log.Error with log.Errorf * server/grpc: fix log typo * server/rpc: fix log typo Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
582f2e8b94
commit
bbc3b7040b
2
go.mod
2
go.mod
@ -68,7 +68,7 @@ require (
|
|||||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
|
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1
|
google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1
|
||||||
google.golang.org/grpc v1.26.0
|
google.golang.org/grpc v1.26.0
|
||||||
google.golang.org/protobuf v1.22.0 // indirect
|
google.golang.org/protobuf v1.22.0
|
||||||
gopkg.in/telegram-bot-api.v4 v4.6.4
|
gopkg.in/telegram-bot-api.v4 v4.6.4
|
||||||
sigs.k8s.io/yaml v1.1.0 // indirect
|
sigs.k8s.io/yaml v1.1.0 // indirect
|
||||||
)
|
)
|
||||||
|
@ -87,7 +87,7 @@ func prepareEndpoint(method reflect.Method) *methodType {
|
|||||||
contextType = mtype.In(1)
|
contextType = mtype.In(1)
|
||||||
default:
|
default:
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn())
|
logger.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ func prepareEndpoint(method reflect.Method) *methodType {
|
|||||||
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
|
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
|
||||||
if !argType.Implements(streamType) {
|
if !argType.Implements(streamType) {
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error(mname, "argument does not implement Streamer interface:", argType)
|
logger.Errorf("%v argument does not implement Streamer interface: %v", mname, argType)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -107,14 +107,14 @@ func prepareEndpoint(method reflect.Method) *methodType {
|
|||||||
// First arg need not be a pointer.
|
// First arg need not be a pointer.
|
||||||
if !isExportedOrBuiltinType(argType) {
|
if !isExportedOrBuiltinType(argType) {
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error(mname, "argument type not exported:", argType)
|
logger.Errorf("%v argument type not exported: %v", mname, argType)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if replyType.Kind() != reflect.Ptr {
|
if replyType.Kind() != reflect.Ptr {
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error("method", mname, "reply type not a pointer:", replyType)
|
logger.Errorf("method %v reply type not a pointer: %v", mname, replyType)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ func prepareEndpoint(method reflect.Method) *methodType {
|
|||||||
// Reply type must be exported.
|
// Reply type must be exported.
|
||||||
if !isExportedOrBuiltinType(replyType) {
|
if !isExportedOrBuiltinType(replyType) {
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error("method", mname, "reply type not exported:", replyType)
|
logger.Errorf("method %v reply type not exported: %v", mname, replyType)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -131,14 +131,14 @@ func prepareEndpoint(method reflect.Method) *methodType {
|
|||||||
// Endpoint() needs one out.
|
// Endpoint() needs one out.
|
||||||
if mtype.NumOut() != 1 {
|
if mtype.NumOut() != 1 {
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error("method", mname, "has wrong number of outs:", mtype.NumOut())
|
logger.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// The return type of the method must be error.
|
// The return type of the method must be error.
|
||||||
if returnType := mtype.Out(0); returnType != typeOfError {
|
if returnType := mtype.Out(0); returnType != typeOfError {
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||||
logger.Error("method", mname, "returns", returnType.String(), "not error")
|
logger.Errorf("method %v returns %v not error", mname, returnType.String())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -156,7 +156,7 @@ func (server *rServer) register(rcvr interface{}) error {
|
|||||||
s.rcvr = reflect.ValueOf(rcvr)
|
s.rcvr = reflect.ValueOf(rcvr)
|
||||||
sname := reflect.Indirect(s.rcvr).Type().Name()
|
sname := reflect.Indirect(s.rcvr).Type().Name()
|
||||||
if sname == "" {
|
if sname == "" {
|
||||||
logger.Fatal("rpc: no service name for type", s.typ.String())
|
logger.Fatalf("rpc: no service name for type %v", s.typ.String())
|
||||||
}
|
}
|
||||||
if !isExported(sname) {
|
if !isExported(sname) {
|
||||||
s := "rpc Register: type " + sname + " is not exported"
|
s := "rpc Register: type " + sname + " is not exported"
|
||||||
|
@ -140,7 +140,7 @@ func prepareMethod(method reflect.Method) *methodType {
|
|||||||
replyType = mtype.In(3)
|
replyType = mtype.In(3)
|
||||||
contextType = mtype.In(1)
|
contextType = mtype.In(1)
|
||||||
default:
|
default:
|
||||||
log.Error("method", mname, "of", mtype, "has wrong number of ins:", mtype.NumIn())
|
log.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,7 +148,7 @@ func prepareMethod(method reflect.Method) *methodType {
|
|||||||
// check stream type
|
// check stream type
|
||||||
streamType := reflect.TypeOf((*Stream)(nil)).Elem()
|
streamType := reflect.TypeOf((*Stream)(nil)).Elem()
|
||||||
if !argType.Implements(streamType) {
|
if !argType.Implements(streamType) {
|
||||||
log.Error(mname, "argument does not implement Stream interface:", argType)
|
log.Errorf("%v argument does not implement Stream interface: %v", mname, argType)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -156,30 +156,30 @@ func prepareMethod(method reflect.Method) *methodType {
|
|||||||
|
|
||||||
// First arg need not be a pointer.
|
// First arg need not be a pointer.
|
||||||
if !isExportedOrBuiltinType(argType) {
|
if !isExportedOrBuiltinType(argType) {
|
||||||
log.Error(mname, "argument type not exported:", argType)
|
log.Errorf("%v argument type not exported: %v", mname, argType)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if replyType.Kind() != reflect.Ptr {
|
if replyType.Kind() != reflect.Ptr {
|
||||||
log.Error("method", mname, "reply type not a pointer:", replyType)
|
log.Errorf("method %v reply type not a pointer: %v", mname, replyType)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reply type must be exported.
|
// Reply type must be exported.
|
||||||
if !isExportedOrBuiltinType(replyType) {
|
if !isExportedOrBuiltinType(replyType) {
|
||||||
log.Error("method", mname, "reply type not exported:", replyType)
|
log.Errorf("method %v reply type not exported: %v", mname, replyType)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Method needs one out.
|
// Method needs one out.
|
||||||
if mtype.NumOut() != 1 {
|
if mtype.NumOut() != 1 {
|
||||||
log.Error("method", mname, "has wrong number of outs:", mtype.NumOut())
|
log.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// The return type of the method must be error.
|
// The return type of the method must be error.
|
||||||
if returnType := mtype.Out(0); returnType != typeOfError {
|
if returnType := mtype.Out(0); returnType != typeOfError {
|
||||||
log.Error("method", mname, "returns", returnType.String(), "not error")
|
log.Errorf("method %v returns %v not error", mname, returnType.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
|
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
|
||||||
@ -508,7 +508,7 @@ func (router *router) ProcessMessage(ctx context.Context, msg Message) (err erro
|
|||||||
defer func() {
|
defer func() {
|
||||||
// recover any panics
|
// recover any panics
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.Error("panic recovered: ", r)
|
log.Errorf("panic recovered: %v", r)
|
||||||
log.Error(string(debug.Stack()))
|
log.Error(string(debug.Stack()))
|
||||||
err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r)
|
err = merrors.InternalServerError("go.micro.server", "panic recovered: %v", r)
|
||||||
}
|
}
|
||||||
|
@ -292,3 +292,21 @@ func (c *cacheWrapper) Call(ctx context.Context, req client.Request, rsp interfa
|
|||||||
func CacheClient(cacheFn func() *client.Cache, c client.Client) client.Client {
|
func CacheClient(cacheFn func() *client.Cache, c client.Client) client.Client {
|
||||||
return &cacheWrapper{cacheFn, c}
|
return &cacheWrapper{cacheFn, c}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type staticClient struct {
|
||||||
|
address string
|
||||||
|
client.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *staticClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
|
return s.Client.Call(ctx, req, rsp, append(opts, client.WithAddress(s.address))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *staticClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
return s.Client.Stream(ctx, req, append(opts, client.WithAddress(s.address))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StaticClient sets an address on every call
|
||||||
|
func StaticClient(address string, c client.Client) client.Client {
|
||||||
|
return &staticClient{address, c}
|
||||||
|
}
|
||||||
|
72
util/wrapper/wrapper_static_client_test.go
Normal file
72
util/wrapper/wrapper_static_client_test.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
package wrapper_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v2/broker"
|
||||||
|
bmemory "github.com/micro/go-micro/v2/broker/memory"
|
||||||
|
"github.com/micro/go-micro/v2/client"
|
||||||
|
rmemory "github.com/micro/go-micro/v2/registry/memory"
|
||||||
|
"github.com/micro/go-micro/v2/server"
|
||||||
|
tmemory "github.com/micro/go-micro/v2/transport/memory"
|
||||||
|
wrapper "github.com/micro/go-micro/v2/util/wrapper"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TestFoo struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestReq struct{}
|
||||||
|
|
||||||
|
type TestRsp struct {
|
||||||
|
Data string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *TestFoo) Bar(ctx context.Context, req *TestReq, rsp *TestRsp) error {
|
||||||
|
rsp.Data = "pass"
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStaticClientWrapper(t *testing.T) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
req := client.NewRequest("go.micro.service.foo", "TestFoo.Bar", &TestReq{}, client.WithContentType("application/json"))
|
||||||
|
rsp := &TestRsp{}
|
||||||
|
|
||||||
|
reg := rmemory.NewRegistry()
|
||||||
|
brk := bmemory.NewBroker(broker.Registry(reg))
|
||||||
|
tr := tmemory.NewTransport()
|
||||||
|
|
||||||
|
srv := server.NewServer(
|
||||||
|
server.Broker(brk),
|
||||||
|
server.Registry(reg),
|
||||||
|
server.Name("go.micro.service.foo"),
|
||||||
|
server.Address("127.0.0.1:0"),
|
||||||
|
server.Transport(tr),
|
||||||
|
)
|
||||||
|
if err = srv.Handle(srv.NewHandler(&TestFoo{})); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = srv.Start(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cli := client.NewClient(
|
||||||
|
client.Registry(reg),
|
||||||
|
client.Broker(brk),
|
||||||
|
client.Transport(tr),
|
||||||
|
)
|
||||||
|
|
||||||
|
w1 := wrapper.StaticClient("xxx_localhost:12345", cli)
|
||||||
|
if err = w1.Call(context.TODO(), req, nil); err == nil {
|
||||||
|
t.Fatal("address xxx_#localhost:12345 must not exists and call must be failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
w2 := wrapper.StaticClient(srv.Options().Address, cli)
|
||||||
|
if err = w2.Call(context.TODO(), req, rsp); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if rsp.Data != "pass" {
|
||||||
|
t.Fatalf("something wrong with response: %#+v", rsp)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user