diff --git a/go.mod b/go.mod index 34178f0..722f149 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/golang/protobuf v1.5.2 go.unistack.org/micro/v3 v3.10.14 - golang.org/x/net v0.5.0 + golang.org/x/net v0.7.0 google.golang.org/grpc v1.52.3 google.golang.org/protobuf v1.28.1 ) diff --git a/go.sum b/go.sum index a935c34..9918693 100644 --- a/go.sum +++ b/go.sum @@ -609,8 +609,8 @@ golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= -golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= -golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -714,12 +714,12 @@ golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= -golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -732,8 +732,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= -golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/grpc.go b/grpc.go index da533d4..2be599c 100644 --- a/grpc.go +++ b/grpc.go @@ -39,13 +39,13 @@ const ( ) /* -type ServerReflection struct { +type grpcServerReflection struct { srv *grpc.Server s *serverReflectionServer } */ -type Server struct { +type grpcServer struct { handlers map[string]server.Handler srv *grpc.Server exit chan chan error @@ -60,9 +60,9 @@ type Server struct { reflection bool } -func newServer(opts ...server.Option) *Server { +func newGRPCServer(opts ...server.Option) server.Server { // create a grpc server - g := &Server{ + g := &grpcServer{ opts: server.NewOptions(opts...), rpc: &rServer{ serviceMap: make(map[string]*service), @@ -91,7 +91,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se */ -func (g *Server) configure(opts ...server.Option) error { +func (g *grpcServer) configure(opts ...server.Option) error { g.Lock() defer g.Unlock() @@ -128,10 +128,6 @@ func (g *Server) configure(opts ...server.Option) error { } } - for _, k := range g.opts.Codecs { - encoding.RegisterCodec(&wrapMicroCodec{k}) - } - maxMsgSize := g.getMaxMsgSize() gopts := []grpc.ServerOption{ @@ -169,7 +165,7 @@ func (g *Server) configure(opts ...server.Option) error { return nil } -func (g *Server) getMaxMsgSize() int { +func (g *grpcServer) getMaxMsgSize() int { if g.opts.Context == nil { return codec.DefaultMaxMsgSize } @@ -180,14 +176,14 @@ func (g *Server) getMaxMsgSize() int { return s } -func (g *Server) getCredentials() credentials.TransportCredentials { +func (g *grpcServer) getCredentials() credentials.TransportCredentials { if g.opts.TLSConfig != nil { return credentials.NewTLS(g.opts.TLSConfig) } return nil } -func (g *Server) getGrpcOptions() []grpc.ServerOption { +func (g *grpcServer) getGrpcOptions() []grpc.ServerOption { if g.opts.Context == nil { return nil } @@ -200,7 +196,7 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption { return opts } -func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) { +func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) { fullMethod, ok := grpc.MethodFromServerStream(stream) if !ok { return status.Errorf(codes.Internal, "method does not exist in context") @@ -306,7 +302,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) /* if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { - rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} + rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} svc = &service{} svc.typ = reflect.TypeOf(rfl) svc.rcvr = reflect.ValueOf(rfl) @@ -354,7 +350,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) return g.processStream(ctx, stream, svc, mtype, ct) } -func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { +func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { // for { var err error var argv, replyv reflect.Value @@ -497,12 +493,12 @@ func (s *reflectStream) RecvMsg(m interface{}) error { return s.stream.Recv(m) } -func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { +func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { return g.s.ServerReflectionInfo(&reflectStream{stream}) } */ -func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { +func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { opts := g.opts r := &rpcRequest{ @@ -576,7 +572,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se return status.New(statusCode, statusDesc).Err() } -func (g *Server) newCodec(ct string) (codec.Codec, error) { +func (g *grpcServer) newCodec(ct string) (codec.Codec, error) { g.RLock() defer g.RUnlock() @@ -591,7 +587,7 @@ func (g *Server) newCodec(ct string) (codec.Codec, error) { return nil, codec.ErrUnknownContentType } -func (g *Server) Options() server.Options { +func (g *grpcServer) Options() server.Options { g.RLock() opts := g.opts g.RUnlock() @@ -599,15 +595,15 @@ func (g *Server) Options() server.Options { return opts } -func (g *Server) Init(opts ...server.Option) error { +func (g *grpcServer) Init(opts ...server.Option) error { return g.configure(opts...) } -func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { +func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { return newRPCHandler(h, opts...) } -func (g *Server) Handle(h server.Handler) error { +func (g *grpcServer) Handle(h server.Handler) error { if err := g.rpc.register(h.Handler()); err != nil { return err } @@ -616,11 +612,11 @@ func (g *Server) Handle(h server.Handler) error { return nil } -func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { +func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { return newSubscriber(topic, sb, opts...) } -func (g *Server) Subscribe(sb server.Subscriber) error { +func (g *grpcServer) Subscribe(sb server.Subscriber) error { sub, ok := sb.(*subscriber) if !ok { return fmt.Errorf("invalid subscriber: expected *subscriber") @@ -644,7 +640,7 @@ func (g *Server) Subscribe(sb server.Subscriber) error { return nil } -func (g *Server) Register() error { +func (g *grpcServer) Register() error { g.RLock() rsvc := g.rsvc config := g.opts @@ -749,7 +745,7 @@ func (g *Server) Register() error { return nil } -func (g *Server) Deregister() error { +func (g *grpcServer) Deregister() error { var err error g.RLock() @@ -803,7 +799,7 @@ func (g *Server) Deregister() error { return nil } -func (g *Server) Start() error { +func (g *grpcServer) Start() error { g.RLock() if g.started { g.RUnlock() @@ -813,6 +809,10 @@ func (g *Server) Start() error { config := g.Options() + for _, k := range config.Codecs { + encoding.RegisterCodec(&wrapMicroCodec{k}) + } + // micro: config.Transport.Listen(config.Address) var ts net.Listener var err error @@ -987,7 +987,7 @@ func (g *Server) Start() error { return nil } -func (g *Server) Stop() error { +func (g *grpcServer) Stop() error { g.RLock() if !g.started { g.RUnlock() @@ -1007,18 +1007,14 @@ func (g *Server) Stop() error { return err } -func (g *Server) String() string { +func (g *grpcServer) String() string { return "grpc" } -func (g *Server) Name() string { +func (g *grpcServer) Name() string { return g.opts.Name } -func (g *Server) GRPCServer() *grpc.Server { - return g.srv -} - -func NewServer(opts ...server.Option) *Server { - return newServer(opts...) +func NewServer(opts ...server.Option) server.Server { + return newGRPCServer(opts...) } diff --git a/subscriber.go b/subscriber.go index 94f11f5..b29e691 100644 --- a/subscriber.go +++ b/subscriber.go @@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio } } -func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { +func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { return func(p broker.Event) (err error) { defer func() { if r := recover(); r != nil {