@@ -210,8 +210,15 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
||||
}
|
||||
case func(broker.Message) error:
|
||||
for _, message := range messages {
|
||||
if err = s(message); err == nil && sub.opts.AutoAck {
|
||||
err = message.Ack()
|
||||
msg, ok := message.(*memoryMessage)
|
||||
if !ok {
|
||||
if b.opts.Logger.V(logger.ErrorLevel) {
|
||||
b.opts.Logger.Error(ctx, "broker handler error", broker.ErrInvalidMessage)
|
||||
}
|
||||
}
|
||||
msg.topic = topic
|
||||
if err = s(msg); err == nil && sub.opts.AutoAck {
|
||||
err = msg.Ack()
|
||||
}
|
||||
if err != nil {
|
||||
if b.opts.Logger.V(logger.ErrorLevel) {
|
||||
|
||||
Reference in New Issue
Block a user