fixup for never micro

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-10-16 12:29:28 +03:00
parent bc66ccb9d3
commit e6f7ceb900
4 changed files with 16 additions and 25 deletions

31
grpc.go
View File

@@ -23,7 +23,6 @@ import (
"github.com/unistack-org/micro/v3/server"
"github.com/unistack-org/micro/v3/util/backoff"
mgrpc "github.com/unistack-org/micro/v3/util/grpc"
regutil "github.com/unistack-org/micro/v3/util/registry"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -82,7 +81,7 @@ func init() {
func newGRPCServer(opts ...server.Option) server.Server {
// create a grpc server
g := &grpcServer{
opts: server.NewOptions(),
opts: server.NewOptions(opts...),
rpc: &rServer{
serviceMap: make(map[string]*service),
},
@@ -91,10 +90,6 @@ func newGRPCServer(opts ...server.Option) server.Server {
exit: make(chan chan error),
}
for _, o := range opts {
o(&g.opts)
}
return g
}
@@ -115,11 +110,6 @@ func (g *grpcServer) configure(opts ...server.Option) error {
g.Lock()
defer g.Unlock()
// Don't reprocess if server created
if g.srv != nil {
return nil
}
for _, o := range opts {
o(&g.opts)
}
@@ -720,7 +710,7 @@ func (g *grpcServer) Register() error {
var service *registry.Service
var cacheService bool
service, err = regutil.NewService(g)
service, err = server.NewRegistryService(g)
if err != nil {
return err
}
@@ -791,16 +781,17 @@ func (g *grpcServer) Register() error {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx))
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
if logger.V(logger.InfoLevel) {
logger.Infof("Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
@@ -822,7 +813,7 @@ func (g *grpcServer) Deregister() error {
config := g.opts
g.RUnlock()
service, err := regutil.NewService(g)
service, err := server.NewRegistryService(g)
if err != nil {
return err
}
@@ -855,7 +846,7 @@ func (g *grpcServer) Deregister() error {
if logger.V(logger.InfoLevel) {
logger.Infof("Unsubscribing from topic: %s", s.Topic())
}
if err := s.Unsubscribe(); err != nil {
if err := s.Unsubscribe(g.opts.Context); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err)
}
@@ -919,7 +910,7 @@ func (g *grpcServer) Start() error {
// only connect if we're subscribed
if len(g.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(); err != nil {
if err := config.Broker.Connect(config.Context); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
}
@@ -932,7 +923,7 @@ func (g *grpcServer) Start() error {
}
// use RegisterCheck func before register
if err := g.opts.RegisterCheck(g.opts.Context); err != nil {
if err := g.opts.RegisterCheck(config.Context); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
@@ -1040,7 +1031,7 @@ func (g *grpcServer) Start() error {
logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
if err := config.Broker.Disconnect(); err != nil {
if err := config.Broker.Disconnect(config.Context); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
}