diff --git a/grpc.go b/grpc.go index 2b0dc91..dd711d3 100644 --- a/grpc.go +++ b/grpc.go @@ -59,6 +59,9 @@ type grpcServer struct { started bool // used for first registration registered bool + + // registry service instance + rsvc *registry.Service } func init() { @@ -102,6 +105,9 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se } func (g *grpcServer) configure(opts ...server.Option) { + g.Lock() + defer g.Unlock() + // Don't reprocess where there's no config if len(opts) == 0 && g.srv != nil { return @@ -127,6 +133,7 @@ func (g *grpcServer) configure(opts ...server.Option) { gopts = append(gopts, opts...) } + g.rsvc = nil g.srv = grpc.NewServer(gopts...) } @@ -559,11 +566,24 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error { } func (g *grpcServer) Register() error { + + g.RLock() + rsvc := g.rsvc + config := g.opts + g.RUnlock() + + // if service already filled, reuse it and return early + if rsvc != nil { + rOpts := []registry.RegisterOption{registry.RegisterTTL(config.RegisterTTL)} + if err := config.Registry.Register(rsvc, rOpts...); err != nil { + return err + } + return nil + } + var err error var advt, host, port string - - // parse address for host, port - config := g.opts + var cacheService bool // check the advertise address first // if it exists then use it, otherwise @@ -584,16 +604,17 @@ func (g *grpcServer) Register() error { host = advt } + if ip := net.ParseIP(host); ip != nil { + cacheService = true + } + addr, err := addr.Extract(host) if err != nil { return err } // make copy of metadata - md := make(meta.Metadata) - for k, v := range config.Metadata { - md[k] = v - } + md := meta.Copy(config.Metadata) // register service node := ®istry.Node{ @@ -646,13 +667,13 @@ func (g *grpcServer) Register() error { Endpoints: endpoints, } - g.Lock() + g.RLock() registered := g.registered - g.Unlock() + g.RUnlock() if !registered { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) } } @@ -671,6 +692,9 @@ func (g *grpcServer) Register() error { g.Lock() defer g.Unlock() + if cacheService { + g.rsvc = service + } g.registered = true for sb := range g.subscribers { @@ -688,8 +712,8 @@ func (g *grpcServer) Register() error { opts = append(opts, broker.DisableAutoAck()) } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Subscribing to topic: %s", sb.Topic()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debug("Subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { @@ -705,7 +729,9 @@ func (g *grpcServer) Deregister() error { var err error var advt, host, port string + g.RLock() config := g.opts + g.RUnlock() // check the advertise address first // if it exists then use it, otherwise @@ -742,14 +768,15 @@ func (g *grpcServer) Deregister() error { Nodes: []*registry.Node{node}, } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Deregistering node: %s", node.Id) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Deregistering node: %s", node.Id) } if err := config.Registry.Deregister(service); err != nil { return err } g.Lock() + g.rsvc = nil if !g.registered { g.Unlock() @@ -760,8 +787,8 @@ func (g *grpcServer) Deregister() error { for sb, subs := range g.subscribers { for _, sub := range subs { - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Unsubscribing from topic: %s", sub.Topic()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Unsubscribing from topic: %s", sub.Topic()) } sub.Unsubscribe() } @@ -819,11 +846,14 @@ func (g *grpcServer) Start() error { if len(g.subscribers) > 0 { // connect to the broker if err := config.Broker.Connect(); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) + } return err } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } @@ -900,11 +930,15 @@ func (g *grpcServer) Start() error { // close transport ch <- nil - if logger.V(logger.InfoLevel, logger.DefaultLogger) { - logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker - config.Broker.Disconnect() + if err := config.Broker.Disconnect(); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) + } + } }() // mark the server as started @@ -930,6 +964,7 @@ func (g *grpcServer) Stop() error { select { case err = <-ch: g.Lock() + g.rsvc = nil g.started = false g.Unlock() }