diff --git a/grpc.go b/grpc.go index da533d4..d569bdc 100644 --- a/grpc.go +++ b/grpc.go @@ -718,31 +718,6 @@ func (g *Server) Register() error { g.Lock() defer g.Unlock() - for sb := range g.subscribers { - handler := g.createSubHandler(sb, config) - var opts []broker.SubscribeOption - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - subCtx := config.Context - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - opts = append(opts, broker.SubscribeContext(subCtx)) - opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) - opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly)) - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic()) - } - sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) - if err != nil { - return err - } - g.subscribers[sb] = []broker.Subscriber{sub} - } - g.registered = true g.rsvc = service @@ -876,6 +851,10 @@ func (g *Server) Start() error { } } + if err = g.subscribe(); err != nil { + return err + } + // micro: go ts.Accept(s.accept) go func() { if err = g.srv.Serve(ts); err != nil { @@ -987,6 +966,37 @@ func (g *Server) Start() error { return nil } +func (g *Server) subscribe() error { + config := g.opts + + for sb := range g.subscribers { + handler := g.createSubHandler(sb, config) + var opts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.SubscribeGroup(queue)) + } + + subCtx := config.Context + if cx := sb.Options().Context; cx != nil { + subCtx = cx + } + opts = append(opts, broker.SubscribeContext(subCtx)) + opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) + opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly)) + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic()) + } + sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) + if err != nil { + return err + } + g.subscribers[sb] = []broker.Subscriber{sub} + } + + return nil +} + func (g *Server) Stop() error { g.RLock() if !g.started {