From c2eedbe3b50e4cb4def53453271dc96947f94c1a Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 26 Apr 2021 18:48:54 +0300 Subject: [PATCH] lint Signed-off-by: Vasiliy Tolstov --- .golangci.yml | 44 ++++++++ codec.go | 36 ++----- error.go | 31 +++--- grpc.go | 285 +++++++++++++++++++++++++------------------------- handler.go | 8 +- options.go | 10 +- reflection.go | 2 + request.go | 25 +++-- response.go | 13 ++- server.go | 31 +++--- subscriber.go | 10 +- util.go | 2 + 12 files changed, 261 insertions(+), 236 deletions(-) create mode 100644 .golangci.yml diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..6ff842d --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,44 @@ +run: + concurrency: 4 + deadline: 5m + issues-exit-code: 1 + tests: true + +linters-settings: + govet: + check-shadowing: true + enable: + - fieldalignment + +linters: + enable: + - govet + - deadcode + - errcheck + - govet + - ineffassign + - staticcheck + - structcheck + - typecheck + - unused + - varcheck + - bodyclose + - gci + - goconst + - gocritic + - gosimple + - gofmt + - gofumpt + - goimports + - golint + - gosec + - makezero + - misspell + - nakedret + - nestif + - nilerr + - noctx + - prealloc + - unconvert + - unparam + disable-all: false diff --git a/codec.go b/codec.go index 94412d6..07a846a 100644 --- a/codec.go +++ b/codec.go @@ -4,25 +4,9 @@ import ( "io" "github.com/unistack-org/micro/v3/codec" - "google.golang.org/grpc" "google.golang.org/grpc/encoding" ) -type wrapStream struct{ grpc.ServerStream } - -func (w *wrapStream) Write(d []byte) (int, error) { - n := len(d) - err := w.ServerStream.SendMsg(&codec.Frame{Data: d}) - return n, err -} - -func (w *wrapStream) Read(d []byte) (int, error) { - m := &codec.Frame{} - err := w.ServerStream.RecvMsg(m) - d = m.Data - return len(d), err -} - type wrapMicroCodec struct{ codec.Codec } func (w *wrapMicroCodec) Name() string { @@ -36,43 +20,39 @@ func (w *wrapGrpcCodec) String() string { } func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) { - switch m := v.(type) { - case *codec.Frame: + if m, ok := v.(*codec.Frame); ok { return m.Data, nil } return w.Codec.Marshal(v) } -func (w wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error { +func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error { if d == nil || v == nil { return nil } - switch m := v.(type) { - case *codec.Frame: + if m, ok := v.(*codec.Frame); ok { m.Data = d return nil } return w.Codec.Unmarshal(d, v) } -func (g *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { +func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { return nil } -func (g *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error { - // caller has requested a frame - switch m := v.(type) { - case *codec.Frame: +func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error { + if m, ok := v.(*codec.Frame); ok { _, err := conn.Read(m.Data) return err } return codec.ErrInvalidMessage } -func (g *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { +func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { // if we don't have a body if v != nil { - b, err := g.Marshal(v) + b, err := w.Marshal(v) if err != nil { return err } diff --git a/error.go b/error.go index a5fcfbc..df06f11 100644 --- a/error.go +++ b/error.go @@ -10,21 +10,19 @@ import ( "google.golang.org/grpc/codes" ) -var ( - errMapping = map[int32]codes.Code{ - http.StatusOK: codes.OK, - http.StatusBadRequest: codes.InvalidArgument, - http.StatusRequestTimeout: codes.DeadlineExceeded, - http.StatusNotFound: codes.NotFound, - http.StatusConflict: codes.AlreadyExists, - http.StatusForbidden: codes.PermissionDenied, - http.StatusUnauthorized: codes.Unauthenticated, - http.StatusPreconditionFailed: codes.FailedPrecondition, - http.StatusNotImplemented: codes.Unimplemented, - http.StatusInternalServerError: codes.Internal, - http.StatusServiceUnavailable: codes.Unavailable, - } -) +var errMapping = map[int32]codes.Code{ + http.StatusOK: codes.OK, + http.StatusBadRequest: codes.InvalidArgument, + http.StatusRequestTimeout: codes.DeadlineExceeded, + http.StatusNotFound: codes.NotFound, + http.StatusConflict: codes.AlreadyExists, + http.StatusForbidden: codes.PermissionDenied, + http.StatusUnauthorized: codes.Unauthenticated, + http.StatusPreconditionFailed: codes.FailedPrecondition, + http.StatusNotImplemented: codes.Unimplemented, + http.StatusInternalServerError: codes.Internal, + http.StatusServiceUnavailable: codes.Unavailable, +} // convertCode converts a standard Go error into its canonical code. Note that // this is only used to translate the error returned by the server applications. @@ -60,8 +58,7 @@ func microError(err error) codes.Code { } var ec int32 - switch verr := err.(type) { - case *errors.Error: + if verr, ok := err.(*errors.Error); ok { ec = verr.Code } diff --git a/grpc.go b/grpc.go index 9482039..759a8d0 100644 --- a/grpc.go +++ b/grpc.go @@ -14,6 +14,7 @@ import ( "sync" "time" + // nolint: staticcheck oldproto "github.com/golang/protobuf/proto" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" @@ -29,7 +30,6 @@ import ( "google.golang.org/grpc/encoding" gmetadata "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" - grpcreflect "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) @@ -38,32 +38,27 @@ const ( defaultContentType = "application/grpc" ) +/* type grpcServerReflection struct { srv *grpc.Server s *serverReflectionServer } +*/ type grpcServer struct { - rpc *rServer - srv *grpc.Server - exit chan chan error - wg *sync.WaitGroup - - sync.RWMutex - opts server.Options handlers map[string]server.Handler + srv *grpc.Server + exit chan chan error + wg *sync.WaitGroup + rsvc *register.Service subscribers map[*subscriber][]broker.Subscriber - init bool - // marks the serve as started - started bool - // used for first registration + rpc *rServer + opts server.Options + sync.RWMutex + init bool + started bool registered bool - reflection bool - // register service instance - rsvc *register.Service - - codecs map[string]codec.Codec } func newGRPCServer(opts ...server.Option) server.Server { @@ -292,25 +287,27 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err svc := g.rpc.serviceMap[serviceName] g.rpc.mu.RUnlock() - if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { - rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} - svc = &service{} - svc.typ = reflect.TypeOf(rfl) - svc.rcvr = reflect.ValueOf(rfl) - svc.name = reflect.Indirect(svc.rcvr).Type().Name() - svc.method = make(map[string]*methodType) - typ := reflect.TypeOf(rfl) - if me, ok := typ.MethodByName("ServerReflectionInfo"); ok { - g.rpc.mu.Lock() - ep, err := prepareEndpoint(me) - if ep != nil && err != nil { - svc.method["ServerReflectionInfo"] = ep - } else if err != nil { - return status.New(codes.Unimplemented, err.Error()).Err() + /* + if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { + rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} + svc = &service{} + svc.typ = reflect.TypeOf(rfl) + svc.rcvr = reflect.ValueOf(rfl) + svc.name = reflect.Indirect(svc.rcvr).Type().Name() + svc.method = make(map[string]*methodType) + typ := reflect.TypeOf(rfl) + if me, ok := typ.MethodByName("ServerReflectionInfo"); ok { + g.rpc.mu.Lock() + ep, err := prepareEndpoint(me) + if ep != nil && err != nil { + svc.method["ServerReflectionInfo"] = ep + } else if err != nil { + return status.New(codes.Unimplemented, err.Error()).Err() + } + g.rpc.mu.Unlock() } - g.rpc.mu.Unlock() } - } + */ if svc == nil { return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() @@ -323,117 +320,118 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err // process unary if !mtype.stream { - return g.processRequest(stream, svc, mtype, ct, ctx) + return g.processRequest(ctx, stream, svc, mtype, ct) } // process stream - return g.processStream(stream, svc, mtype, ct, ctx) + return g.processStream(ctx, stream, svc, mtype, ct) } -func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error { - for { - var argv, replyv reflect.Value +func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { + // for { + var argv, replyv reflect.Value - // Decode the argument value. - argIsValue := false // if true, need to indirect before calling. - if mtype.ArgType.Kind() == reflect.Ptr { - argv = reflect.New(mtype.ArgType.Elem()) - } else { - argv = reflect.New(mtype.ArgType) - argIsValue = true - } - - // Unmarshal request - if err := stream.RecvMsg(argv.Interface()); err != nil { - return err - } - - if argIsValue { - argv = argv.Elem() - } - - // reply value - replyv = reflect.New(mtype.ReplyType.Elem()) - - function := mtype.method.Func - var returnValues []reflect.Value - - cf, err := g.newCodec(ct) - if err != nil { - return errors.InternalServerError(g.opts.Name, err.Error()) - } - b, err := cf.Marshal(argv.Interface()) - if err != nil { - return err - } - - // create a client.Request - r := &rpcRequest{ - service: g.opts.Name, - contentType: ct, - method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name), - body: b, - payload: argv.Interface(), - } - // define the handler func - fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { - returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) - - // The return value for the method is an error. - if rerr := returnValues[0].Interface(); rerr != nil { - err = rerr.(error) - } - - return err - } - - // wrap the handler func - for i := len(g.opts.HdlrWrappers); i > 0; i-- { - fn = g.opts.HdlrWrappers[i-1](fn) - } - - statusCode := codes.OK - statusDesc := "" - // execute the handler - if appErr := fn(ctx, r, replyv.Interface()); appErr != nil { - var errStatus *status.Status - switch verr := appErr.(type) { - case *errors.Error: - statusCode = microError(verr) - statusDesc = verr.Error() - errStatus = status.New(statusCode, statusDesc) - case proto.Message: - // user defined error that proto based we can attach it to grpc status - statusCode = convertCode(appErr) - statusDesc = appErr.Error() - errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr)) - if err != nil { - return err - } - default: - g.RLock() - config := g.opts - g.RUnlock() - if config.Logger.V(logger.ErrorLevel) { - config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") - } - // default case user pass own error type that not proto based - statusCode = convertCode(verr) - statusDesc = verr.Error() - errStatus = status.New(statusCode, statusDesc) - } - - return errStatus.Err() - } - - if err := stream.SendMsg(replyv.Interface()); err != nil { - return err - } - - return status.New(statusCode, statusDesc).Err() + // Decode the argument value. + argIsValue := false // if true, need to indirect before calling. + if mtype.ArgType.Kind() == reflect.Ptr { + argv = reflect.New(mtype.ArgType.Elem()) + } else { + argv = reflect.New(mtype.ArgType) + argIsValue = true } + + // Unmarshal request + if err := stream.RecvMsg(argv.Interface()); err != nil { + return err + } + + if argIsValue { + argv = argv.Elem() + } + + // reply value + replyv = reflect.New(mtype.ReplyType.Elem()) + + function := mtype.method.Func + var returnValues []reflect.Value + + cf, err := g.newCodec(ct) + if err != nil { + return errors.InternalServerError(g.opts.Name, err.Error()) + } + b, err := cf.Marshal(argv.Interface()) + if err != nil { + return err + } + + // create a client.Request + r := &rpcRequest{ + service: g.opts.Name, + contentType: ct, + method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name), + body: b, + payload: argv.Interface(), + } + // define the handler func + fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) { + returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)}) + + // The return value for the method is an error. + if rerr := returnValues[0].Interface(); rerr != nil { + err = rerr.(error) + } + + return err + } + + // wrap the handler func + for i := len(g.opts.HdlrWrappers); i > 0; i-- { + fn = g.opts.HdlrWrappers[i-1](fn) + } + + statusCode := codes.OK + statusDesc := "" + // execute the handler + if appErr := fn(ctx, r, replyv.Interface()); appErr != nil { + var errStatus *status.Status + switch verr := appErr.(type) { + case *errors.Error: + statusCode = microError(verr) + statusDesc = verr.Error() + errStatus = status.New(statusCode, statusDesc) + case proto.Message: + // user defined error that proto based we can attach it to grpc status + statusCode = convertCode(appErr) + statusDesc = appErr.Error() + errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr)) + if err != nil { + return err + } + default: + g.RLock() + config := g.opts + g.RUnlock() + if config.Logger.V(logger.ErrorLevel) { + config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") + } + // default case user pass own error type that not proto based + statusCode = convertCode(verr) + statusDesc = verr.Error() + errStatus = status.New(statusCode, statusDesc) + } + + return errStatus.Err() + } + + if err := stream.SendMsg(replyv.Interface()); err != nil { + return err + } + + return status.New(statusCode, statusDesc).Err() + // } } +/* type reflectStream struct { stream server.Stream } @@ -475,8 +473,9 @@ func (s *reflectStream) RecvMsg(m interface{}) error { func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { return g.s.ServerReflectionInfo(&reflectStream{stream}) } +*/ -func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error { +func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { opts := g.opts r := &rpcRequest{ @@ -574,7 +573,7 @@ func (g *grpcServer) Init(opts ...server.Option) error { } func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { - return newRpcHandler(h, opts...) + return newRPCHandler(h, opts...) } func (g *grpcServer) Handle(h server.Handler) error { @@ -635,15 +634,15 @@ func (g *grpcServer) Register() error { g.RLock() // Maps are ordered randomly, sort the keys for consistency - var handlerList []string - for n, _ := range g.handlers { + handlerList := make([]string, 0, len(g.handlers)) + for n := range g.handlers { // Only advertise non internal handlers handlerList = append(handlerList, n) } sort.Strings(handlerList) - var subscriberList []*subscriber + subscriberList := make([]*subscriber, 0, len(g.subscribers)) for e := range g.subscribers { // Only advertise non internal subscribers subscriberList = append(subscriberList, e) @@ -662,7 +661,7 @@ func (g *grpcServer) Register() error { g.RUnlock() service.Nodes[0].Metadata["protocol"] = "grpc" - service.Nodes[0].Metadata["transport"] = "grpc" + service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] service.Endpoints = endpoints g.RLock() @@ -836,6 +835,7 @@ func (g *grpcServer) Start() error { } // use RegisterCheck func before register + // nolint: nestif if err := g.opts.RegisterCheck(config.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.Id, err) @@ -884,6 +884,7 @@ func (g *grpcServer) Start() error { registered := g.registered g.RUnlock() rerr := g.opts.RegisterCheck(g.opts.Context) + // nolint: nestif if rerr != nil && registered { if config.Logger.V(logger.ErrorLevel) { config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) diff --git a/handler.go b/handler.go index f099d55..5d99c49 100644 --- a/handler.go +++ b/handler.go @@ -8,13 +8,13 @@ import ( ) type rpcHandler struct { - name string - handler interface{} - endpoints []*register.Endpoint opts server.HandlerOptions + handler interface{} + name string + endpoints []*register.Endpoint } -func newRpcHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { +func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { options := server.NewHandlerOptions(opts...) typ := reflect.TypeOf(handler) diff --git a/options.go b/options.go index d121476..3501abb 100644 --- a/options.go +++ b/options.go @@ -8,10 +8,12 @@ import ( "google.golang.org/grpc/encoding" ) -type codecsKey struct{} -type grpcOptions struct{} -type maxMsgSizeKey struct{} -type reflectionKey struct{} +type ( + codecsKey struct{} + grpcOptions struct{} + maxMsgSizeKey struct{} + reflectionKey struct{} +) // gRPC Codec to be used to encode/decode requests for a given content type func Codec(contentType string, c encoding.Codec) server.Option { diff --git a/reflection.go b/reflection.go index fc78268..82482e9 100644 --- a/reflection.go +++ b/reflection.go @@ -1,3 +1,5 @@ +// +build ignore + /* * * Copyright 2016 gRPC authors. diff --git a/request.go b/request.go index 927c12f..b7e11c0 100644 --- a/request.go +++ b/request.go @@ -5,29 +5,34 @@ import ( "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" +) + +var ( + _ server.Request = &rpcRequest{} + _ server.Message = &rpcMessage{} ) type rpcRequest struct { rw io.ReadWriter - service string - method string - endpoint string - target string - contentType string + payload interface{} codec codec.Codec header metadata.Metadata + method string + endpoint string + contentType string + service string body []byte stream bool - payload interface{} } type rpcMessage struct { + payload interface{} + codec codec.Codec + header metadata.Metadata topic string contentType string - payload interface{} - header metadata.Metadata body []byte - codec codec.Codec } func (r *rpcRequest) ContentType() string { @@ -43,7 +48,7 @@ func (r *rpcRequest) Method() string { } func (r *rpcRequest) Endpoint() string { - return r.method + return r.endpoint } func (r *rpcRequest) Codec() codec.Codec { diff --git a/response.go b/response.go index 44f29af..acbc282 100644 --- a/response.go +++ b/response.go @@ -5,16 +5,15 @@ import ( "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" ) +var _ server.Response = &rpcResponse{} + type rpcResponse struct { - rw io.ReadWriter - header metadata.Metadata - codec codec.Codec - endpoint string - service string - method string - target string + rw io.ReadWriter + header metadata.Metadata + codec codec.Codec } func (r *rpcResponse) Codec() codec.Codec { diff --git a/server.go b/server.go index 84df065..842b883 100644 --- a/server.go +++ b/server.go @@ -17,34 +17,32 @@ import ( "github.com/unistack-org/micro/v3/server" ) -var ( - // Precompute the reflect type for error. Can't use error directly - // because Typeof takes an empty interface value. This is annoying. - typeOfError = reflect.TypeOf((*error)(nil)).Elem() -) +// Precompute the reflect type for error. Can't use error directly +// because Typeof takes an empty interface value. This is annoying. +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() type methodType struct { - method reflect.Method ArgType reflect.Type ReplyType reflect.Type ContextType reflect.Type + method reflect.Method stream bool } -type reflectionType func(context.Context, server.Stream) error +// type reflectionType func(context.Context, server.Stream) error type service struct { - name string // name of service - rcvr reflect.Value // receiver of methods for the service - typ reflect.Type // type of the receiver - method map[string]*methodType // registered methods + typ reflect.Type + method map[string]*methodType + rcvr reflect.Value + name string } // server represents an RPC Server. type rServer struct { - mu sync.RWMutex // protects the serviceMap serviceMap map[string]*service - reflection bool + mu sync.RWMutex + // reflection bool } // Is this an exported - upper case - name? @@ -91,15 +89,14 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) { return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) } - if stream { + switch stream { + case true: // check stream type streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() if !argType.Implements(streamType) { return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) } - } else { - // if not stream check the replyType - + default: // First arg need not be a pointer. if !isExportedOrBuiltinType(argType) { return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) diff --git a/subscriber.go b/subscriber.go index 98edd77..bb99f6b 100644 --- a/subscriber.go +++ b/subscriber.go @@ -15,14 +15,10 @@ import ( "github.com/unistack-org/micro/v3/server" ) -const ( - subSig = "func(context.Context, interface{}) error" -) - type handler struct { - method reflect.Value reqType reflect.Type ctxType reflect.Type + method reflect.Value } type subscriber struct { @@ -195,14 +191,14 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke if g.wg != nil { defer g.wg.Done() } - err := fn(ctx, &rpcMessage{ + cerr := fn(ctx, &rpcMessage{ topic: sb.topic, contentType: ct, payload: req.Interface(), header: msg.Header, body: msg.Body, }) - results <- err + results <- cerr }() } var errors []string diff --git a/util.go b/util.go index f39da0a..75d424b 100644 --- a/util.go +++ b/util.go @@ -39,6 +39,7 @@ func serviceMethod(m string) (string, string, error) { return parts[0], parts[1], nil } +/* // ServiceFromMethod returns the service // /service.Foo/Bar => service func serviceFromMethod(m string) string { @@ -55,3 +56,4 @@ func serviceFromMethod(m string) string { parts = strings.Split(parts[1], ".") return strings.Join(parts[:len(parts)-1], ".") } +*/ -- 2.45.2