44
									
								
								.golangci.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								.golangci.yml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,44 @@ | |||||||
|  | run: | ||||||
|  |   concurrency: 4 | ||||||
|  |   deadline: 5m | ||||||
|  |   issues-exit-code: 1 | ||||||
|  |   tests: true | ||||||
|  |  | ||||||
|  | linters-settings: | ||||||
|  |   govet: | ||||||
|  |     check-shadowing: true | ||||||
|  |     enable: | ||||||
|  |       - fieldalignment | ||||||
|  |  | ||||||
|  | linters: | ||||||
|  |   enable: | ||||||
|  |     - govet | ||||||
|  |     - deadcode | ||||||
|  |     - errcheck | ||||||
|  |     - govet | ||||||
|  |     - ineffassign | ||||||
|  |     - staticcheck | ||||||
|  |     - structcheck | ||||||
|  |     - typecheck | ||||||
|  |     - unused | ||||||
|  |     - varcheck | ||||||
|  |     - bodyclose | ||||||
|  |     - gci | ||||||
|  |     - goconst | ||||||
|  |     - gocritic | ||||||
|  |     - gosimple | ||||||
|  |     - gofmt | ||||||
|  |     - gofumpt | ||||||
|  |     - goimports | ||||||
|  |     - golint | ||||||
|  |     - gosec | ||||||
|  |     - makezero | ||||||
|  |     - misspell | ||||||
|  |     - nakedret | ||||||
|  |     - nestif | ||||||
|  |     - nilerr | ||||||
|  |     - noctx | ||||||
|  |     - prealloc | ||||||
|  |     - unconvert | ||||||
|  |     - unparam | ||||||
|  |   disable-all: false | ||||||
							
								
								
									
										36
									
								
								codec.go
									
									
									
									
									
								
							
							
						
						
									
										36
									
								
								codec.go
									
									
									
									
									
								
							| @@ -4,25 +4,9 @@ import ( | |||||||
| 	"io" | 	"io" | ||||||
|  |  | ||||||
| 	"github.com/unistack-org/micro/v3/codec" | 	"github.com/unistack-org/micro/v3/codec" | ||||||
| 	"google.golang.org/grpc" |  | ||||||
| 	"google.golang.org/grpc/encoding" | 	"google.golang.org/grpc/encoding" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type wrapStream struct{ grpc.ServerStream } |  | ||||||
|  |  | ||||||
| func (w *wrapStream) Write(d []byte) (int, error) { |  | ||||||
| 	n := len(d) |  | ||||||
| 	err := w.ServerStream.SendMsg(&codec.Frame{Data: d}) |  | ||||||
| 	return n, err |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (w *wrapStream) Read(d []byte) (int, error) { |  | ||||||
| 	m := &codec.Frame{} |  | ||||||
| 	err := w.ServerStream.RecvMsg(m) |  | ||||||
| 	d = m.Data |  | ||||||
| 	return len(d), err |  | ||||||
| } |  | ||||||
|  |  | ||||||
| type wrapMicroCodec struct{ codec.Codec } | type wrapMicroCodec struct{ codec.Codec } | ||||||
|  |  | ||||||
| func (w *wrapMicroCodec) Name() string { | func (w *wrapMicroCodec) Name() string { | ||||||
| @@ -36,43 +20,39 @@ func (w *wrapGrpcCodec) String() string { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) { | func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) { | ||||||
| 	switch m := v.(type) { | 	if m, ok := v.(*codec.Frame); ok { | ||||||
| 	case *codec.Frame: |  | ||||||
| 		return m.Data, nil | 		return m.Data, nil | ||||||
| 	} | 	} | ||||||
| 	return w.Codec.Marshal(v) | 	return w.Codec.Marshal(v) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (w wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error { | func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error { | ||||||
| 	if d == nil || v == nil { | 	if d == nil || v == nil { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	switch m := v.(type) { | 	if m, ok := v.(*codec.Frame); ok { | ||||||
| 	case *codec.Frame: |  | ||||||
| 		m.Data = d | 		m.Data = d | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	return w.Codec.Unmarshal(d, v) | 	return w.Codec.Unmarshal(d, v) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (g *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { | func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (g *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error { | func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error { | ||||||
| 	// caller has requested a frame | 	if m, ok := v.(*codec.Frame); ok { | ||||||
| 	switch m := v.(type) { |  | ||||||
| 	case *codec.Frame: |  | ||||||
| 		_, err := conn.Read(m.Data) | 		_, err := conn.Read(m.Data) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	return codec.ErrInvalidMessage | 	return codec.ErrInvalidMessage | ||||||
| } | } | ||||||
|  |  | ||||||
| func (g *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { | func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { | ||||||
| 	// if we don't have a body | 	// if we don't have a body | ||||||
| 	if v != nil { | 	if v != nil { | ||||||
| 		b, err := g.Marshal(v) | 		b, err := w.Marshal(v) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|   | |||||||
							
								
								
									
										9
									
								
								error.go
									
									
									
									
									
								
							
							
						
						
									
										9
									
								
								error.go
									
									
									
									
									
								
							| @@ -10,8 +10,7 @@ import ( | |||||||
| 	"google.golang.org/grpc/codes" | 	"google.golang.org/grpc/codes" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var errMapping = map[int32]codes.Code{ | ||||||
| 	errMapping = map[int32]codes.Code{ |  | ||||||
| 	http.StatusOK:                  codes.OK, | 	http.StatusOK:                  codes.OK, | ||||||
| 	http.StatusBadRequest:          codes.InvalidArgument, | 	http.StatusBadRequest:          codes.InvalidArgument, | ||||||
| 	http.StatusRequestTimeout:      codes.DeadlineExceeded, | 	http.StatusRequestTimeout:      codes.DeadlineExceeded, | ||||||
| @@ -23,8 +22,7 @@ var ( | |||||||
| 	http.StatusNotImplemented:      codes.Unimplemented, | 	http.StatusNotImplemented:      codes.Unimplemented, | ||||||
| 	http.StatusInternalServerError: codes.Internal, | 	http.StatusInternalServerError: codes.Internal, | ||||||
| 	http.StatusServiceUnavailable:  codes.Unavailable, | 	http.StatusServiceUnavailable:  codes.Unavailable, | ||||||
| 	} | } | ||||||
| ) |  | ||||||
|  |  | ||||||
| // convertCode converts a standard Go error into its canonical code. Note that | // convertCode converts a standard Go error into its canonical code. Note that | ||||||
| // this is only used to translate the error returned by the server applications. | // this is only used to translate the error returned by the server applications. | ||||||
| @@ -60,8 +58,7 @@ func microError(err error) codes.Code { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var ec int32 | 	var ec int32 | ||||||
| 	switch verr := err.(type) { | 	if verr, ok := err.(*errors.Error); ok { | ||||||
| 	case *errors.Error: |  | ||||||
| 		ec = verr.Code | 		ec = verr.Code | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										57
									
								
								grpc.go
									
									
									
									
									
								
							
							
						
						
									
										57
									
								
								grpc.go
									
									
									
									
									
								
							| @@ -14,6 +14,7 @@ import ( | |||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	// nolint: staticcheck | ||||||
| 	oldproto "github.com/golang/protobuf/proto" | 	oldproto "github.com/golang/protobuf/proto" | ||||||
| 	"github.com/unistack-org/micro/v3/broker" | 	"github.com/unistack-org/micro/v3/broker" | ||||||
| 	"github.com/unistack-org/micro/v3/codec" | 	"github.com/unistack-org/micro/v3/codec" | ||||||
| @@ -29,7 +30,6 @@ import ( | |||||||
| 	"google.golang.org/grpc/encoding" | 	"google.golang.org/grpc/encoding" | ||||||
| 	gmetadata "google.golang.org/grpc/metadata" | 	gmetadata "google.golang.org/grpc/metadata" | ||||||
| 	"google.golang.org/grpc/peer" | 	"google.golang.org/grpc/peer" | ||||||
| 	grpcreflect "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" |  | ||||||
| 	"google.golang.org/grpc/status" | 	"google.golang.org/grpc/status" | ||||||
| 	"google.golang.org/protobuf/proto" | 	"google.golang.org/protobuf/proto" | ||||||
| ) | ) | ||||||
| @@ -38,32 +38,27 @@ const ( | |||||||
| 	defaultContentType = "application/grpc" | 	defaultContentType = "application/grpc" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | /* | ||||||
| type grpcServerReflection struct { | type grpcServerReflection struct { | ||||||
| 	srv *grpc.Server | 	srv *grpc.Server | ||||||
| 	s   *serverReflectionServer | 	s   *serverReflectionServer | ||||||
| } | } | ||||||
|  | */ | ||||||
|  |  | ||||||
| type grpcServer struct { | type grpcServer struct { | ||||||
| 	rpc  *rServer | 	handlers    map[string]server.Handler | ||||||
| 	srv         *grpc.Server | 	srv         *grpc.Server | ||||||
| 	exit        chan chan error | 	exit        chan chan error | ||||||
| 	wg          *sync.WaitGroup | 	wg          *sync.WaitGroup | ||||||
|  |  | ||||||
| 	sync.RWMutex |  | ||||||
| 	opts        server.Options |  | ||||||
| 	handlers    map[string]server.Handler |  | ||||||
| 	subscribers map[*subscriber][]broker.Subscriber |  | ||||||
| 	init        bool |  | ||||||
| 	// marks the serve as started |  | ||||||
| 	started bool |  | ||||||
| 	// used for first registration |  | ||||||
| 	registered bool |  | ||||||
|  |  | ||||||
| 	reflection bool |  | ||||||
| 	// register service instance |  | ||||||
| 	rsvc        *register.Service | 	rsvc        *register.Service | ||||||
|  | 	subscribers map[*subscriber][]broker.Subscriber | ||||||
| 	codecs map[string]codec.Codec | 	rpc         *rServer | ||||||
|  | 	opts        server.Options | ||||||
|  | 	sync.RWMutex | ||||||
|  | 	init       bool | ||||||
|  | 	started    bool | ||||||
|  | 	registered bool | ||||||
|  | 	reflection bool | ||||||
| } | } | ||||||
|  |  | ||||||
| func newGRPCServer(opts ...server.Option) server.Server { | func newGRPCServer(opts ...server.Option) server.Server { | ||||||
| @@ -292,6 +287,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err | |||||||
| 	svc := g.rpc.serviceMap[serviceName] | 	svc := g.rpc.serviceMap[serviceName] | ||||||
| 	g.rpc.mu.RUnlock() | 	g.rpc.mu.RUnlock() | ||||||
|  |  | ||||||
|  | 	/* | ||||||
| 		if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { | 		if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { | ||||||
| 			rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} | 			rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} | ||||||
| 			svc = &service{} | 			svc = &service{} | ||||||
| @@ -311,6 +307,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err | |||||||
| 				g.rpc.mu.Unlock() | 				g.rpc.mu.Unlock() | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 	*/ | ||||||
|  |  | ||||||
| 	if svc == nil { | 	if svc == nil { | ||||||
| 		return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() | 		return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() | ||||||
| @@ -323,15 +320,15 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err | |||||||
|  |  | ||||||
| 	// process unary | 	// process unary | ||||||
| 	if !mtype.stream { | 	if !mtype.stream { | ||||||
| 		return g.processRequest(stream, svc, mtype, ct, ctx) | 		return g.processRequest(ctx, stream, svc, mtype, ct) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// process stream | 	// process stream | ||||||
| 	return g.processStream(stream, svc, mtype, ct, ctx) | 	return g.processStream(ctx, stream, svc, mtype, ct) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error { | func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { | ||||||
| 	for { | 	// for { | ||||||
| 	var argv, replyv reflect.Value | 	var argv, replyv reflect.Value | ||||||
|  |  | ||||||
| 	// Decode the argument value. | 	// Decode the argument value. | ||||||
| @@ -431,9 +428,10 @@ func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return status.New(statusCode, statusDesc).Err() | 	return status.New(statusCode, statusDesc).Err() | ||||||
| 	} | 	//	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /* | ||||||
| type reflectStream struct { | type reflectStream struct { | ||||||
| 	stream server.Stream | 	stream server.Stream | ||||||
| } | } | ||||||
| @@ -475,8 +473,9 @@ func (s *reflectStream) RecvMsg(m interface{}) error { | |||||||
| func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { | func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { | ||||||
| 	return g.s.ServerReflectionInfo(&reflectStream{stream}) | 	return g.s.ServerReflectionInfo(&reflectStream{stream}) | ||||||
| } | } | ||||||
|  | */ | ||||||
|  |  | ||||||
| func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error { | func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { | ||||||
| 	opts := g.opts | 	opts := g.opts | ||||||
|  |  | ||||||
| 	r := &rpcRequest{ | 	r := &rpcRequest{ | ||||||
| @@ -574,7 +573,7 @@ func (g *grpcServer) Init(opts ...server.Option) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { | func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { | ||||||
| 	return newRpcHandler(h, opts...) | 	return newRPCHandler(h, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (g *grpcServer) Handle(h server.Handler) error { | func (g *grpcServer) Handle(h server.Handler) error { | ||||||
| @@ -635,15 +634,15 @@ func (g *grpcServer) Register() error { | |||||||
|  |  | ||||||
| 	g.RLock() | 	g.RLock() | ||||||
| 	// Maps are ordered randomly, sort the keys for consistency | 	// Maps are ordered randomly, sort the keys for consistency | ||||||
| 	var handlerList []string | 	handlerList := make([]string, 0, len(g.handlers)) | ||||||
| 	for n, _ := range g.handlers { | 	for n := range g.handlers { | ||||||
| 		// Only advertise non internal handlers | 		// Only advertise non internal handlers | ||||||
| 		handlerList = append(handlerList, n) | 		handlerList = append(handlerList, n) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sort.Strings(handlerList) | 	sort.Strings(handlerList) | ||||||
|  |  | ||||||
| 	var subscriberList []*subscriber | 	subscriberList := make([]*subscriber, 0, len(g.subscribers)) | ||||||
| 	for e := range g.subscribers { | 	for e := range g.subscribers { | ||||||
| 		// Only advertise non internal subscribers | 		// Only advertise non internal subscribers | ||||||
| 		subscriberList = append(subscriberList, e) | 		subscriberList = append(subscriberList, e) | ||||||
| @@ -662,7 +661,7 @@ func (g *grpcServer) Register() error { | |||||||
| 	g.RUnlock() | 	g.RUnlock() | ||||||
|  |  | ||||||
| 	service.Nodes[0].Metadata["protocol"] = "grpc" | 	service.Nodes[0].Metadata["protocol"] = "grpc" | ||||||
| 	service.Nodes[0].Metadata["transport"] = "grpc" | 	service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] | ||||||
| 	service.Endpoints = endpoints | 	service.Endpoints = endpoints | ||||||
|  |  | ||||||
| 	g.RLock() | 	g.RLock() | ||||||
| @@ -836,6 +835,7 @@ func (g *grpcServer) Start() error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// use RegisterCheck func before register | 	// use RegisterCheck func before register | ||||||
|  | 	// nolint: nestif | ||||||
| 	if err := g.opts.RegisterCheck(config.Context); err != nil { | 	if err := g.opts.RegisterCheck(config.Context); err != nil { | ||||||
| 		if config.Logger.V(logger.ErrorLevel) { | 		if config.Logger.V(logger.ErrorLevel) { | ||||||
| 			config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.Id, err) | 			config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.Id, err) | ||||||
| @@ -884,6 +884,7 @@ func (g *grpcServer) Start() error { | |||||||
| 				registered := g.registered | 				registered := g.registered | ||||||
| 				g.RUnlock() | 				g.RUnlock() | ||||||
| 				rerr := g.opts.RegisterCheck(g.opts.Context) | 				rerr := g.opts.RegisterCheck(g.opts.Context) | ||||||
|  | 				// nolint: nestif | ||||||
| 				if rerr != nil && registered { | 				if rerr != nil && registered { | ||||||
| 					if config.Logger.V(logger.ErrorLevel) { | 					if config.Logger.V(logger.ErrorLevel) { | ||||||
| 						config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) | 						config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) | ||||||
|   | |||||||
| @@ -8,13 +8,13 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| type rpcHandler struct { | type rpcHandler struct { | ||||||
| 	name      string |  | ||||||
| 	handler   interface{} |  | ||||||
| 	endpoints []*register.Endpoint |  | ||||||
| 	opts      server.HandlerOptions | 	opts      server.HandlerOptions | ||||||
|  | 	handler   interface{} | ||||||
|  | 	name      string | ||||||
|  | 	endpoints []*register.Endpoint | ||||||
| } | } | ||||||
|  |  | ||||||
| func newRpcHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { | func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { | ||||||
| 	options := server.NewHandlerOptions(opts...) | 	options := server.NewHandlerOptions(opts...) | ||||||
|  |  | ||||||
| 	typ := reflect.TypeOf(handler) | 	typ := reflect.TypeOf(handler) | ||||||
|   | |||||||
							
								
								
									
										10
									
								
								options.go
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								options.go
									
									
									
									
									
								
							| @@ -8,10 +8,12 @@ import ( | |||||||
| 	"google.golang.org/grpc/encoding" | 	"google.golang.org/grpc/encoding" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type codecsKey struct{} | type ( | ||||||
| type grpcOptions struct{} | 	codecsKey     struct{} | ||||||
| type maxMsgSizeKey struct{} | 	grpcOptions   struct{} | ||||||
| type reflectionKey struct{} | 	maxMsgSizeKey struct{} | ||||||
|  | 	reflectionKey struct{} | ||||||
|  | ) | ||||||
|  |  | ||||||
| // gRPC Codec to be used to encode/decode requests for a given content type | // gRPC Codec to be used to encode/decode requests for a given content type | ||||||
| func Codec(contentType string, c encoding.Codec) server.Option { | func Codec(contentType string, c encoding.Codec) server.Option { | ||||||
|   | |||||||
| @@ -1,3 +1,5 @@ | |||||||
|  | // +build ignore | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * |  * | ||||||
|  * Copyright 2016 gRPC authors. |  * Copyright 2016 gRPC authors. | ||||||
|   | |||||||
							
								
								
									
										25
									
								
								request.go
									
									
									
									
									
								
							
							
						
						
									
										25
									
								
								request.go
									
									
									
									
									
								
							| @@ -5,29 +5,34 @@ import ( | |||||||
|  |  | ||||||
| 	"github.com/unistack-org/micro/v3/codec" | 	"github.com/unistack-org/micro/v3/codec" | ||||||
| 	"github.com/unistack-org/micro/v3/metadata" | 	"github.com/unistack-org/micro/v3/metadata" | ||||||
|  | 	"github.com/unistack-org/micro/v3/server" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	_ server.Request = &rpcRequest{} | ||||||
|  | 	_ server.Message = &rpcMessage{} | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type rpcRequest struct { | type rpcRequest struct { | ||||||
| 	rw          io.ReadWriter | 	rw          io.ReadWriter | ||||||
| 	service     string | 	payload     interface{} | ||||||
| 	method      string |  | ||||||
| 	endpoint    string |  | ||||||
| 	target      string |  | ||||||
| 	contentType string |  | ||||||
| 	codec       codec.Codec | 	codec       codec.Codec | ||||||
| 	header      metadata.Metadata | 	header      metadata.Metadata | ||||||
|  | 	method      string | ||||||
|  | 	endpoint    string | ||||||
|  | 	contentType string | ||||||
|  | 	service     string | ||||||
| 	body        []byte | 	body        []byte | ||||||
| 	stream      bool | 	stream      bool | ||||||
| 	payload     interface{} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| type rpcMessage struct { | type rpcMessage struct { | ||||||
|  | 	payload     interface{} | ||||||
|  | 	codec       codec.Codec | ||||||
|  | 	header      metadata.Metadata | ||||||
| 	topic       string | 	topic       string | ||||||
| 	contentType string | 	contentType string | ||||||
| 	payload     interface{} |  | ||||||
| 	header      metadata.Metadata |  | ||||||
| 	body        []byte | 	body        []byte | ||||||
| 	codec       codec.Codec |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcRequest) ContentType() string { | func (r *rpcRequest) ContentType() string { | ||||||
| @@ -43,7 +48,7 @@ func (r *rpcRequest) Method() string { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcRequest) Endpoint() string { | func (r *rpcRequest) Endpoint() string { | ||||||
| 	return r.method | 	return r.endpoint | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcRequest) Codec() codec.Codec { | func (r *rpcRequest) Codec() codec.Codec { | ||||||
|   | |||||||
| @@ -5,16 +5,15 @@ import ( | |||||||
|  |  | ||||||
| 	"github.com/unistack-org/micro/v3/codec" | 	"github.com/unistack-org/micro/v3/codec" | ||||||
| 	"github.com/unistack-org/micro/v3/metadata" | 	"github.com/unistack-org/micro/v3/metadata" | ||||||
|  | 	"github.com/unistack-org/micro/v3/server" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | var _ server.Response = &rpcResponse{} | ||||||
|  |  | ||||||
| type rpcResponse struct { | type rpcResponse struct { | ||||||
| 	rw     io.ReadWriter | 	rw     io.ReadWriter | ||||||
| 	header metadata.Metadata | 	header metadata.Metadata | ||||||
| 	codec  codec.Codec | 	codec  codec.Codec | ||||||
| 	endpoint string |  | ||||||
| 	service  string |  | ||||||
| 	method   string |  | ||||||
| 	target   string |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcResponse) Codec() codec.Codec { | func (r *rpcResponse) Codec() codec.Codec { | ||||||
|   | |||||||
							
								
								
									
										31
									
								
								server.go
									
									
									
									
									
								
							
							
						
						
									
										31
									
								
								server.go
									
									
									
									
									
								
							| @@ -17,34 +17,32 @@ import ( | |||||||
| 	"github.com/unistack-org/micro/v3/server" | 	"github.com/unistack-org/micro/v3/server" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | // Precompute the reflect type for error. Can't use error directly | ||||||
| 	// Precompute the reflect type for error. Can't use error directly | // because Typeof takes an empty interface value. This is annoying. | ||||||
| 	// because Typeof takes an empty interface value. This is annoying. | var typeOfError = reflect.TypeOf((*error)(nil)).Elem() | ||||||
| 	typeOfError = reflect.TypeOf((*error)(nil)).Elem() |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type methodType struct { | type methodType struct { | ||||||
| 	method      reflect.Method |  | ||||||
| 	ArgType     reflect.Type | 	ArgType     reflect.Type | ||||||
| 	ReplyType   reflect.Type | 	ReplyType   reflect.Type | ||||||
| 	ContextType reflect.Type | 	ContextType reflect.Type | ||||||
|  | 	method      reflect.Method | ||||||
| 	stream      bool | 	stream      bool | ||||||
| } | } | ||||||
|  |  | ||||||
| type reflectionType func(context.Context, server.Stream) error | // type reflectionType func(context.Context, server.Stream) error | ||||||
|  |  | ||||||
| type service struct { | type service struct { | ||||||
| 	name   string                 // name of service | 	typ    reflect.Type | ||||||
| 	rcvr   reflect.Value          // receiver of methods for the service | 	method map[string]*methodType | ||||||
| 	typ    reflect.Type           // type of the receiver | 	rcvr   reflect.Value | ||||||
| 	method map[string]*methodType // registered methods | 	name   string | ||||||
| } | } | ||||||
|  |  | ||||||
| // server represents an RPC Server. | // server represents an RPC Server. | ||||||
| type rServer struct { | type rServer struct { | ||||||
| 	mu         sync.RWMutex // protects the serviceMap |  | ||||||
| 	serviceMap map[string]*service | 	serviceMap map[string]*service | ||||||
| 	reflection bool | 	mu         sync.RWMutex | ||||||
|  | 	// reflection bool | ||||||
| } | } | ||||||
|  |  | ||||||
| // Is this an exported - upper case - name? | // Is this an exported - upper case - name? | ||||||
| @@ -91,15 +89,14 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) { | |||||||
| 		return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) | 		return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if stream { | 	switch stream { | ||||||
|  | 	case true: | ||||||
| 		// check stream type | 		// check stream type | ||||||
| 		streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() | 		streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() | ||||||
| 		if !argType.Implements(streamType) { | 		if !argType.Implements(streamType) { | ||||||
| 			return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) | 			return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) | ||||||
| 		} | 		} | ||||||
| 	} else { | 	default: | ||||||
| 		// if not stream check the replyType |  | ||||||
|  |  | ||||||
| 		// First arg need not be a pointer. | 		// First arg need not be a pointer. | ||||||
| 		if !isExportedOrBuiltinType(argType) { | 		if !isExportedOrBuiltinType(argType) { | ||||||
| 			return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) | 			return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) | ||||||
|   | |||||||
| @@ -15,14 +15,10 @@ import ( | |||||||
| 	"github.com/unistack-org/micro/v3/server" | 	"github.com/unistack-org/micro/v3/server" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( |  | ||||||
| 	subSig = "func(context.Context, interface{}) error" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type handler struct { | type handler struct { | ||||||
| 	method  reflect.Value |  | ||||||
| 	reqType reflect.Type | 	reqType reflect.Type | ||||||
| 	ctxType reflect.Type | 	ctxType reflect.Type | ||||||
|  | 	method  reflect.Value | ||||||
| } | } | ||||||
|  |  | ||||||
| type subscriber struct { | type subscriber struct { | ||||||
| @@ -195,14 +191,14 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke | |||||||
| 				if g.wg != nil { | 				if g.wg != nil { | ||||||
| 					defer g.wg.Done() | 					defer g.wg.Done() | ||||||
| 				} | 				} | ||||||
| 				err := fn(ctx, &rpcMessage{ | 				cerr := fn(ctx, &rpcMessage{ | ||||||
| 					topic:       sb.topic, | 					topic:       sb.topic, | ||||||
| 					contentType: ct, | 					contentType: ct, | ||||||
| 					payload:     req.Interface(), | 					payload:     req.Interface(), | ||||||
| 					header:      msg.Header, | 					header:      msg.Header, | ||||||
| 					body:        msg.Body, | 					body:        msg.Body, | ||||||
| 				}) | 				}) | ||||||
| 				results <- err | 				results <- cerr | ||||||
| 			}() | 			}() | ||||||
| 		} | 		} | ||||||
| 		var errors []string | 		var errors []string | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								util.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								util.go
									
									
									
									
									
								
							| @@ -39,6 +39,7 @@ func serviceMethod(m string) (string, string, error) { | |||||||
| 	return parts[0], parts[1], nil | 	return parts[0], parts[1], nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /* | ||||||
| // ServiceFromMethod returns the service | // ServiceFromMethod returns the service | ||||||
| // /service.Foo/Bar => service | // /service.Foo/Bar => service | ||||||
| func serviceFromMethod(m string) string { | func serviceFromMethod(m string) string { | ||||||
| @@ -55,3 +56,4 @@ func serviceFromMethod(m string) string { | |||||||
| 	parts = strings.Split(parts[1], ".") | 	parts = strings.Split(parts[1], ".") | ||||||
| 	return strings.Join(parts[:len(parts)-1], ".") | 	return strings.Join(parts[:len(parts)-1], ".") | ||||||
| } | } | ||||||
|  | */ | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user