From e1af4aa3a40632c12c4ea22263b198b3abfa3579 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 25 May 2023 23:18:47 +0300 Subject: [PATCH] server/noop: fix graceful unsubscribe Signed-off-by: Vasiliy Tolstov --- server/noop.go | 76 +++++++++++++++++++++++++------------------- server/subscriber.go | 4 +-- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/server/noop.go b/server/noop.go index 8334cd2e..c397eee8 100644 --- a/server/noop.go +++ b/server/noop.go @@ -202,39 +202,6 @@ func (n *noopServer) Register() error { n.Lock() defer n.Unlock() - cx := config.Context - - var sub broker.Subscriber - - for sb := range n.subscribers { - if sb.Options().Context != nil { - cx = sb.Options().Context - } - - opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)} - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - if sb.Options().Batch { - // batch processing handler - sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...) - } else { - // single processing handler - sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...) - } - - if err != nil { - return err - } - - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic()) - } - - n.subscribers[sb] = []broker.Subscriber{sub} - } - n.registered = true if cacheService { n.rsvc = service @@ -366,6 +333,10 @@ func (n *noopServer) Start() error { } } + if err := n.subscribe(); err != nil { + return err + } + go func() { t := new(time.Ticker) @@ -449,6 +420,45 @@ func (n *noopServer) Start() error { return nil } +func (n *noopServer) subscribe() error { + config := n.Options() + + cx := config.Context + var err error + var sub broker.Subscriber + + for sb := range n.subscribers { + if sb.Options().Context != nil { + cx = sb.Options().Context + } + + opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)} + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.SubscribeGroup(queue)) + } + + if sb.Options().Batch { + // batch processing handler + sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...) + } else { + // single processing handler + sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...) + } + + if err != nil { + return err + } + + if config.Logger.V(logger.InfoLevel) { + config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic()) + } + + n.subscribers[sb] = []broker.Subscriber{sub} + } + + return nil +} + func (n *noopServer) Stop() error { n.RLock() if !n.started { diff --git a/server/subscriber.go b/server/subscriber.go index 54b7a964..45dd8f67 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs } //nolint:gocyclo -func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { +func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { return func(ps broker.Events) (err error) { defer func() { if r := recover(); r != nil { @@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat } //nolint:gocyclo -func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler { +func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { return func(p broker.Event) (err error) { defer func() { if r := recover(); r != nil {