diff --git a/go.mod b/go.mod index 4172cd9..640b60d 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/golang/protobuf v1.4.2 github.com/google/go-cmp v0.5.1 // indirect github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 - github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47 + github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f golang.org/x/net v0.0.0-20200904194848-62affa334b73 golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 // indirect golang.org/x/text v0.3.3 // indirect diff --git a/go.sum b/go.sum index 53e29a9..281db2e 100644 --- a/go.sum +++ b/go.sum @@ -265,8 +265,8 @@ github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.m github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47 h1:3d/HgT7Iq/UIw5OGyzfUeZPJwydhBohh9shyGJH14EA= -github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f h1:9PLnkfb9vdn1yHlKLIGo5AiSNzqerZscsm9R+uW+DAw= +github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/grpc.go b/grpc.go index 818038d..27b231e 100644 --- a/grpc.go +++ b/grpc.go @@ -23,7 +23,6 @@ import ( "github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/util/backoff" mgrpc "github.com/unistack-org/micro/v3/util/grpc" - regutil "github.com/unistack-org/micro/v3/util/registry" "golang.org/x/net/netutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -82,7 +81,7 @@ func init() { func newGRPCServer(opts ...server.Option) server.Server { // create a grpc server g := &grpcServer{ - opts: server.NewOptions(), + opts: server.NewOptions(opts...), rpc: &rServer{ serviceMap: make(map[string]*service), }, @@ -91,10 +90,6 @@ func newGRPCServer(opts ...server.Option) server.Server { exit: make(chan chan error), } - for _, o := range opts { - o(&g.opts) - } - return g } @@ -115,11 +110,6 @@ func (g *grpcServer) configure(opts ...server.Option) error { g.Lock() defer g.Unlock() - // Don't reprocess if server created - if g.srv != nil { - return nil - } - for _, o := range opts { o(&g.opts) } @@ -720,7 +710,7 @@ func (g *grpcServer) Register() error { var service *registry.Service var cacheService bool - service, err = regutil.NewService(g) + service, err = server.NewRegistryService(g) if err != nil { return err } @@ -791,16 +781,17 @@ func (g *grpcServer) Register() error { opts = append(opts, broker.SubscribeGroup(queue)) } + subCtx := config.Context if cx := sb.Options().Context; cx != nil { - opts = append(opts, broker.SubscribeContext(cx)) + subCtx = cx } - + opts = append(opts, broker.SubscribeContext(subCtx)) opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) if logger.V(logger.InfoLevel) { logger.Infof("Subscribing to topic: %s", sb.Topic()) } - sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) if err != nil { return err } @@ -822,7 +813,7 @@ func (g *grpcServer) Deregister() error { config := g.opts g.RUnlock() - service, err := regutil.NewService(g) + service, err := server.NewRegistryService(g) if err != nil { return err } @@ -855,7 +846,7 @@ func (g *grpcServer) Deregister() error { if logger.V(logger.InfoLevel) { logger.Infof("Unsubscribing from topic: %s", s.Topic()) } - if err := s.Unsubscribe(); err != nil { + if err := s.Unsubscribe(g.opts.Context); err != nil { if logger.V(logger.ErrorLevel) { logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err) } @@ -919,7 +910,7 @@ func (g *grpcServer) Start() error { // only connect if we're subscribed if len(g.subscribers) > 0 { // connect to the broker - if err := config.Broker.Connect(); err != nil { + if err := config.Broker.Connect(config.Context); err != nil { if logger.V(logger.ErrorLevel) { logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) } @@ -932,7 +923,7 @@ func (g *grpcServer) Start() error { } // use RegisterCheck func before register - if err := g.opts.RegisterCheck(g.opts.Context); err != nil { + if err := g.opts.RegisterCheck(config.Context); err != nil { if logger.V(logger.ErrorLevel) { logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) } @@ -1040,7 +1031,7 @@ func (g *grpcServer) Start() error { logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker - if err := config.Broker.Disconnect(); err != nil { + if err := config.Broker.Disconnect(config.Context); err != nil { if logger.V(logger.ErrorLevel) { logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) } diff --git a/request.go b/request.go index 0e1383e..76f964d 100644 --- a/request.go +++ b/request.go @@ -1,8 +1,8 @@ package grpc import ( + raw "github.com/unistack-org/micro-codec-bytes" "github.com/unistack-org/micro/v3/codec" - "github.com/unistack-org/micro-codec-bytes" ) type rpcRequest struct { @@ -50,7 +50,7 @@ func (r *rpcRequest) Header() map[string]string { } func (r *rpcRequest) Read() ([]byte, error) { - f := &bytes.Frame{} + f := &raw.Frame{} if err := r.codec.ReadBody(f); err != nil { return nil, err }