diff --git a/go.mod b/go.mod index c277b90..5e8cc24 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,9 @@ go 1.15 require ( github.com/golang/protobuf v1.5.1 - github.com/unistack-org/micro/v3 v3.2.26 + github.com/unistack-org/micro/v3 v3.3.0 golang.org/x/net v0.0.0-20210324051636-2c4c8ecb7826 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect - google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d // indirect google.golang.org/grpc v1.36.0 google.golang.org/protobuf v1.26.0 ) diff --git a/go.sum b/go.sum index b7e623b..b0bc5a7 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/unistack-org/micro/v3 v3.2.26 h1:tWCDuLRb1GDjY2nxbah0jm6n+sBiPxlgcRFXX7n73Q0= -github.com/unistack-org/micro/v3 v3.2.26/go.mod h1:iJwCWq2PECMxigfqe6TPC5GLWvj6P94Kk+PTVZGL3w8= +github.com/unistack-org/micro/v3 v3.3.0 h1:pEj/8QVFzMlNMEL//q/Te8qgG+XI6LTYIQrb6hMymgk= +github.com/unistack-org/micro/v3 v3.3.0/go.mod h1:iJwCWq2PECMxigfqe6TPC5GLWvj6P94Kk+PTVZGL3w8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -53,7 +53,6 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 h1:b0LrWgu8+q7z4J+0Y3Umo5q1dL7NXBkKBWkaVkAq17E= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210324051636-2c4c8ecb7826 h1:lNRDRnwZWawoPHDS50ebYHTOHjctRMLSrUSQFcAHiW4= golang.org/x/net v0.0.0-20210324051636-2c4c8ecb7826/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= @@ -82,9 +81,8 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d h1:92D1fum1bJLKSdr11OJ+54YeCMCGYIygTA7R/YZxH5M= -google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -99,7 +97,6 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= diff --git a/grpc.go b/grpc.go index a3944b8..304dafd 100644 --- a/grpc.go +++ b/grpc.go @@ -53,6 +53,7 @@ type grpcServer struct { opts server.Options handlers map[string]server.Handler subscribers map[*subscriber][]broker.Subscriber + init bool // marks the serve as started started bool // used for first registration @@ -80,19 +81,22 @@ func newGRPCServer(opts ...server.Option) server.Server { return g } +/* type grpcRouter struct { - h func(context.Context, server.Request, interface{}) error - m func(context.Context, server.Message) error + h func(context.Context, server.Request, interface{}) error + m func(context.Context, server.Message) error } func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error { - return r.m(ctx, msg) + return r.m(ctx, msg) } func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { - return r.h(ctx, req, rsp) + return r.h(ctx, req, rsp) } +*/ + func (g *grpcServer) configure(opts ...server.Option) error { g.Lock() defer g.Unlock() @@ -101,12 +105,26 @@ func (g *grpcServer) configure(opts ...server.Option) error { o(&g.opts) } - if g.opts.Register == nil { - return fmt.Errorf("register not set") + if err := g.opts.Register.Init(); err != nil { + return err } - - if g.opts.Broker == nil { - return fmt.Errorf("broker not set") + if err := g.opts.Broker.Init(); err != nil { + return err + } + if err := g.opts.Tracer.Init(); err != nil { + return err + } + if err := g.opts.Auth.Init(); err != nil { + return err + } + if err := g.opts.Logger.Init(); err != nil { + return err + } + if err := g.opts.Meter.Init(); err != nil { + return err + } + if err := g.opts.Transport.Init(); err != nil { + return err } g.wg = g.opts.Wait @@ -153,6 +171,8 @@ func (g *grpcServer) configure(opts ...server.Option) error { return g.Start() } + g.init = true + return nil } @@ -270,58 +290,6 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err } } - // process via router - if g.opts.Router != nil { - cf, err := g.newCodec(ct) - if err != nil { - return errors.InternalServerError(g.opts.Name, err.Error()) - } - - // create a client.Request - request := &rpcRequest{ - rw: &wrapStream{stream}, - service: serviceFromMethod(fullMethod), - contentType: ct, - method: fmt.Sprintf("%s.%s", serviceName, methodName), - endpoint: fmt.Sprintf("%s.%s", serviceName, methodName), - target: g.opts.Name, - codec: cf, - stream: true, - } - - response := &rpcResponse{ - method: fmt.Sprintf("%s.%s", serviceName, methodName), - endpoint: fmt.Sprintf("%s.%s", serviceName, methodName), - target: g.opts.Name, - rw: &wrapStream{stream}, - header: make(map[string]string), - codec: cf, - } - - // create a wrapped function - handler := func(ctx context.Context, req server.Request, rsp interface{}) error { - return g.opts.Router.ServeRequest(ctx, req, rsp.(server.Response)) - } - - // execute the wrapper for it - for i := len(g.opts.HdlrWrappers); i > 0; i-- { - handler = g.opts.HdlrWrappers[i-1](handler) - } - - r := grpcRouter{h: handler} - - // serve the actual request using the request router - if err := r.ServeRequest(ctx, request, response); err != nil { - if _, ok := status.FromError(err); ok { - return err - } - return status.Errorf(codes.Internal, err.Error()) - } - - return nil - } - - // process the standard request flow g.rpc.mu.RLock() svc := g.rpc.serviceMap[serviceName] g.rpc.mu.RUnlock() @@ -597,6 +565,9 @@ func (g *grpcServer) Options() server.Options { } func (g *grpcServer) Init(opts ...server.Option) error { + if len(opts) == 0 && g.init { + return nil + } return g.configure(opts...) }