server/noop: fix graceful unsubscribe #218
@ -202,39 +202,6 @@ func (n *noopServer) Register() error {
|
|||||||
n.Lock()
|
n.Lock()
|
||||||
defer n.Unlock()
|
defer n.Unlock()
|
||||||
|
|
||||||
cx := config.Context
|
|
||||||
|
|
||||||
var sub broker.Subscriber
|
|
||||||
|
|
||||||
for sb := range n.subscribers {
|
|
||||||
if sb.Options().Context != nil {
|
|
||||||
cx = sb.Options().Context
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
if sb.Options().Batch {
|
|
||||||
// batch processing handler
|
|
||||||
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.newBatchSubHandler(sb, config), opts...)
|
|
||||||
} else {
|
|
||||||
// single processing handler
|
|
||||||
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.newSubHandler(sb, config), opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
|
|
||||||
n.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
n.registered = true
|
n.registered = true
|
||||||
if cacheService {
|
if cacheService {
|
||||||
n.rsvc = service
|
n.rsvc = service
|
||||||
@ -366,6 +333,10 @@ func (n *noopServer) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := n.subscribe(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
t := new(time.Ticker)
|
t := new(time.Ticker)
|
||||||
|
|
||||||
@ -449,6 +420,45 @@ func (n *noopServer) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *noopServer) subscribe() error {
|
||||||
|
config := n.Options()
|
||||||
|
|
||||||
|
cx := config.Context
|
||||||
|
var err error
|
||||||
|
var sub broker.Subscriber
|
||||||
|
|
||||||
|
for sb := range n.subscribers {
|
||||||
|
if sb.Options().Context != nil {
|
||||||
|
cx = sb.Options().Context
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []broker.SubscribeOption{broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)}
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
if sb.Options().Batch {
|
||||||
|
// batch processing handler
|
||||||
|
sub, err = config.Broker.BatchSubscribe(cx, sb.Topic(), n.createBatchSubHandler(sb, config), opts...)
|
||||||
|
} else {
|
||||||
|
// single processing handler
|
||||||
|
sub, err = config.Broker.Subscribe(cx, sb.Topic(), n.createSubHandler(sb, config), opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(n.opts.Context, "subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
|
||||||
|
n.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (n *noopServer) Stop() error {
|
func (n *noopServer) Stop() error {
|
||||||
n.RLock()
|
n.RLock()
|
||||||
if !n.started {
|
if !n.started {
|
||||||
|
@ -191,7 +191,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
|||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
func (n *noopServer) createBatchSubHandler(sb *subscriber, opts Options) broker.BatchHandler {
|
||||||
return func(ps broker.Events) (err error) {
|
return func(ps broker.Events) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
@ -309,7 +309,7 @@ func (n *noopServer) newBatchSubHandler(sb *subscriber, opts Options) broker.Bat
|
|||||||
}
|
}
|
||||||
|
|
||||||
//nolint:gocyclo
|
//nolint:gocyclo
|
||||||
func (n *noopServer) newSubHandler(sb *subscriber, opts Options) broker.Handler {
|
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||||
return func(p broker.Event) (err error) {
|
return func(p broker.Event) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user