diff --git a/go.mod b/go.mod index 4e7635f..d7d1a6b 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.1 // indirect github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 - github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 + github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e golang.org/x/net v0.0.0-20200904194848-62affa334b73 golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 // indirect golang.org/x/text v0.3.3 // indirect diff --git a/go.sum b/go.sum index 6ae579f..1b9bc29 100644 --- a/go.sum +++ b/go.sum @@ -281,8 +281,8 @@ github.com/unistack-org/micro/v3 v3.0.0-gamma/go.mod h1:iEtpu3wTYCRs3pQ3VsFEO7JB github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568 h1:2h+k414Q3ABTRHByIvPJYZbi5s8qlCi9yG7x3wqaFDs= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201102230232-8a2b12201568/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e h1:v27OUgoE2UOyCe6uLksdpG6oErx62nUXWIkTPxS7yIw= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201104214903-1fbf8b2e209e/go.mod h1:LFvCXGOgcLIj2k/8eL71TpIpcJBN2SXXAUx8U6dz9Rw= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/grpc.go b/grpc.go index 9ca1f78..90e20f7 100644 --- a/grpc.go +++ b/grpc.go @@ -225,14 +225,20 @@ func (g *grpcServer) getListener() net.Listener { func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) { defer func() { if r := recover(); r != nil { - if logger.V(logger.ErrorLevel) { - logger.Error("panic recovered: ", r) - logger.Error(string(debug.Stack())) + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("panic recovered: ", r) + config.Logger.Error(string(debug.Stack())) } err = errors.InternalServerError(g.opts.Name, "panic recovered: %v", r) } else if err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("grpc handler got error: %s", err) + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("grpc handler got error: %s", err) } } }() @@ -364,7 +370,12 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err typ := reflect.TypeOf(rfl) if me, ok := typ.MethodByName("ServerReflectionInfo"); ok { g.rpc.mu.Lock() - svc.method["ServerReflectionInfo"] = prepareEndpoint(me) + ep, err := prepareEndpoint(me) + if ep != nil && err != nil { + svc.method["ServerReflectionInfo"] = ep + } else if err != nil { + return status.New(codes.Unimplemented, err.Error()).Err() + } g.rpc.mu.Unlock() } } @@ -476,8 +487,11 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, return err } default: - if logger.V(logger.ErrorLevel) { - logger.Warn("handler error will not be transferred properly, must return *errors.Error or proto.Message") + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Warn("handler error will not be transferred properly, must return *errors.Error or proto.Message") } // default case user pass own error type that not proto based statusCode = convertCode(verr) @@ -598,8 +612,11 @@ func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, m return err } default: - if logger.V(logger.ErrorLevel) { - logger.Warn("handler error will not be transferred properly, must return *errors.Error or proto.Message") + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Warn("handler error will not be transferred properly, must return *errors.Error or proto.Message") } // default case user pass own error type that not proto based statusCode = convertCode(verr) @@ -767,8 +784,8 @@ func (g *grpcServer) Register() error { g.RUnlock() if !registered { - if logger.V(logger.InfoLevel) { - logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) } } @@ -799,8 +816,8 @@ func (g *grpcServer) Register() error { opts = append(opts, broker.SubscribeContext(subCtx)) opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) - if logger.V(logger.InfoLevel) { - logger.Infof("Subscribing to topic: %s", sb.Topic()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) if err != nil { @@ -829,8 +846,8 @@ func (g *grpcServer) Deregister() error { return err } - if logger.V(logger.InfoLevel) { - logger.Infof("Deregistering node: %s", service.Nodes[0].Id) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Deregistering node: %s", service.Nodes[0].Id) } opt := registry.DeregisterDomain(g.opts.Namespace) @@ -854,12 +871,12 @@ func (g *grpcServer) Deregister() error { wg.Add(1) go func(s broker.Subscriber) { defer wg.Done() - if logger.V(logger.InfoLevel) { - logger.Infof("Unsubscribing from topic: %s", s.Topic()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Unsubscribing from topic: %s", s.Topic()) } if err := s.Unsubscribe(g.opts.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Unsubscribing from topic: %s err: %v", s.Topic(), err) } } }(sub) @@ -908,8 +925,8 @@ func (g *grpcServer) Start() error { } } - if logger.V(logger.InfoLevel) { - logger.Infof("Server [grpc] Listening on %s", ts.Addr().String()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Server [grpc] Listening on %s", ts.Addr().String()) } g.Lock() g.opts.Address = ts.Addr().String() @@ -922,27 +939,27 @@ func (g *grpcServer) Start() error { if len(g.subscribers) > 0 { // connect to the broker if err := config.Broker.Connect(config.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Broker [%s] connect error: %v", config.Broker.String(), err) } return err } - if logger.V(logger.InfoLevel) { - logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } // use RegisterCheck func before register if err := g.opts.RegisterCheck(config.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, err) } } else { // announce self to the world if err := g.Register(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server register error: %v", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server register error: %v", err) } } } @@ -950,12 +967,12 @@ func (g *grpcServer) Start() error { // micro: go ts.Accept(s.accept) go func() { if err := g.srv.Serve(ts); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("gRPC Server start error: %v", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("gRPC Server start error: %v", err) } if err := g.Stop(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("gRPC Server stop error: %v", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("gRPC Server stop error: %v", err) } } } @@ -983,24 +1000,24 @@ func (g *grpcServer) Start() error { g.RUnlock() rerr := g.opts.RegisterCheck(g.opts.Context) if rerr != nil && registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) } // deregister self in case of error if err := g.Deregister(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s deregister error: %s", config.Name, config.Id, err) } } } else if rerr != nil && !registered { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register check error: %s", config.Name, config.Id, rerr) } continue } if err := g.Register(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit @@ -1011,8 +1028,8 @@ func (g *grpcServer) Start() error { // deregister self if err := g.Deregister(); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Error("Server deregister error: ", err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Server deregister error: ", err) } } @@ -1038,13 +1055,13 @@ func (g *grpcServer) Start() error { // close transport ch <- nil - if logger.V(logger.InfoLevel) { - logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + if config.Logger.V(logger.InfoLevel) { + config.Logger.Info("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker if err := config.Broker.Disconnect(config.Context); err != nil { - if logger.V(logger.ErrorLevel) { - logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Error("Broker [%s] disconnect error: %v", config.Broker.String(), err) } } }() diff --git a/server.go b/server.go index 2c7e215..84df065 100644 --- a/server.go +++ b/server.go @@ -14,7 +14,6 @@ import ( "unicode" "unicode/utf8" - "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/server" ) @@ -66,7 +65,7 @@ func isExportedOrBuiltinType(t reflect.Type) bool { // prepareEndpoint() returns a methodType for the provided method or nil // in case if the method was unsuitable. -func prepareEndpoint(method reflect.Method) *methodType { +func prepareEndpoint(method reflect.Method) (*methodType, error) { mtype := method.Type mname := method.Name var replyType, argType, contextType reflect.Type @@ -74,7 +73,7 @@ func prepareEndpoint(method reflect.Method) *methodType { // Endpoint() must be exported. if method.PkgPath != "" { - return nil + return nil, fmt.Errorf("Endpoint must be exported") } switch mtype.NumIn() { @@ -89,63 +88,42 @@ func prepareEndpoint(method reflect.Method) *methodType { replyType = mtype.In(3) contextType = mtype.In(1) default: - if logger.V(logger.ErrorLevel) { - logger.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) - } - return nil + return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) } if stream { // check stream type streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() if !argType.Implements(streamType) { - if logger.V(logger.ErrorLevel) { - logger.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) - } - return nil + return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) } } else { // if not stream check the replyType // First arg need not be a pointer. if !isExportedOrBuiltinType(argType) { - if logger.V(logger.ErrorLevel) { - logger.Errorf("%v argument type not exported: %v", mname, argType) - } - return nil + return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) } if replyType.Kind() != reflect.Ptr { - if logger.V(logger.ErrorLevel) { - logger.Errorf("method %v reply type not a pointer: %v", mname, replyType) - } - return nil + return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType) } // Reply type must be exported. if !isExportedOrBuiltinType(replyType) { - if logger.V(logger.ErrorLevel) { - logger.Errorf("method %v reply type not exported: %v", mname, replyType) - } - return nil + return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType) } } // Endpoint() needs one out. if mtype.NumOut() != 1 { - if logger.V(logger.ErrorLevel) { - logger.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut()) - } - return nil + return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut()) } // The return type of the method must be error. if returnType := mtype.Out(0); returnType != typeOfError { - if logger.V(logger.ErrorLevel) { - logger.Errorf("method %v returns %v not error", mname, returnType.String()) - } - return nil + return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String()) } - return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream} + return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil } func (server *rServer) register(rcvr interface{}) error { @@ -162,11 +140,7 @@ func (server *rServer) register(rcvr interface{}) error { return fmt.Errorf("rpc: no service name for type %v", s.typ.String()) } if !isExported(sname) { - s := "rpc Register: type " + sname + " is not exported" - if logger.V(logger.ErrorLevel) { - logger.Error(s) - } - return fmt.Errorf(s) + return fmt.Errorf("rpc Register: type %s is not exported", sname) } if _, present := server.serviceMap[sname]; present { return fmt.Errorf("rpc: service already defined: " + sname) @@ -177,17 +151,16 @@ func (server *rServer) register(rcvr interface{}) error { // Install the methods for m := 0; m < s.typ.NumMethod(); m++ { method := s.typ.Method(m) - if mt := prepareEndpoint(method); mt != nil { + mt, err := prepareEndpoint(method) + if mt != nil && err == nil { s.method[method.Name] = mt + } else if err != nil { + return err } } if len(s.method) == 0 { - s := "rpc Register: type " + sname + " has no exported methods of suitable type" - if logger.V(logger.ErrorLevel) { - logger.Error(s) - } - return fmt.Errorf(s) + return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname) } server.serviceMap[s.name] = s return nil