server/noop: fix graceful unsubscribe #218

Merged
vtolstov merged 1 commits from unsubfix into v3 2023-05-25 23:19:27 +03:00
2 changed files with 45 additions and 35 deletions

View File

@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
cx := config.Context
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
n.registered = true n.registered = true
if cacheService { if cacheService {
n.rsvc = service n.rsvc = service
@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
} }
} }
if err := n.subscribe(); err != nil {
return err
}
go func() { go func() {
t := new(time.Ticker) t := new(time.Ticker)
@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
return nil return nil
} }
func (n *noopServer) subscribe() error {
config := n.Options()
cx := config.Context
var err error
var sub broker.Subscriber
for sb := range n.subscribers {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Batch {
// batch processing handler
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
} else {
// single processing handler
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
}
if err != nil {
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (n *noopServer) Stop() error { func (n *noopServer) Stop() error {
n.RLock() n.RLock()
if !n.started { if !n.started {

View File

@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
} }
//nolint:gocyclo //nolint:gocyclo
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler { func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
return func(ps broker.Events) (err error) { return func(ps broker.Events) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
} }
//nolint:gocyclo //nolint:gocyclo
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler { func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) { return func(p broker.Event) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {