diff --git a/broker/memory.go b/broker/memory.go index 2d72be0a..2b31408f 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -13,10 +13,9 @@ import ( ) type memoryBroker struct { - subscribers map[string][]*memorySubscriber - batchsubscribers map[string][]*memorySubscriber - addr string - opts Options + subscribers map[string][]*memorySubscriber + addr string + opts Options sync.RWMutex connected bool } @@ -147,76 +146,66 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts . } func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error { - if len(m.batchsubscribers) > 0 { - eh := m.opts.BatchErrorHandler - - msgTopicMap := make(map[string]Events) - for _, v := range vs { - p := &memoryEvent{ - topic: v.topic, - message: v.body, - opts: m.opts, - } - msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) - } - - for t, ms := range msgTopicMap { - m.RLock() - subs, ok := m.batchsubscribers[t] - m.RUnlock() - if !ok { - continue - } - for _, sub := range subs { - if err := sub.batchhandler(ms); err != nil { - ms.SetError(err) - if sub.opts.BatchErrorHandler != nil { - eh = sub.opts.BatchErrorHandler - } - if eh != nil { - eh(ms) - } else if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, err.Error()) - } - } else if sub.opts.AutoAck { - if err := ms.Ack(); err != nil { - m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) - } - } - } - } - - } - - eh := m.opts.ErrorHandler + var err error + msgTopicMap := make(map[string]Events) for _, v := range vs { p := &memoryEvent{ topic: v.topic, message: v.body, opts: m.opts, } + msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) + } + beh := m.opts.BatchErrorHandler + eh := m.opts.ErrorHandler + + for t, ms := range msgTopicMap { m.RLock() - subs, ok := m.subscribers[p.topic] + subs, ok := m.subscribers[t] m.RUnlock() if !ok { continue } + for _, sub := range subs { - if err := sub.handler(p); err != nil { - p.SetError(err) - if sub.opts.ErrorHandler != nil { - eh = sub.opts.ErrorHandler + // batch processing + if sub.batchhandler != nil { + if err = sub.batchhandler(ms); err != nil { + ms.SetError(err) + if sub.opts.BatchErrorHandler != nil { + beh = sub.opts.BatchErrorHandler + } + if beh != nil { + beh(ms) + } else if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, err.Error()) + } + } else if sub.opts.AutoAck { + if err = ms.Ack(); err != nil { + m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } } - if eh != nil { - eh(p) - } else if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, err.Error()) - } - } else if sub.opts.AutoAck { - if err := p.Ack(); err != nil { - m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } + // single processing + if sub.handler != nil { + for _, p := range ms { + if err = sub.handler(p); err != nil { + p.SetError(err) + if sub.opts.ErrorHandler != nil { + eh = sub.opts.ErrorHandler + } + if eh != nil { + eh(p) + } else if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, err.Error()) + } + } else if sub.opts.AutoAck { + if err = p.Ack(); err != nil { + m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } + } } } } @@ -250,20 +239,20 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler } m.Lock() - m.batchsubscribers[topic] = append(m.batchsubscribers[topic], sub) + m.subscribers[topic] = append(m.subscribers[topic], sub) m.Unlock() go func() { <-sub.exit m.Lock() - newSubscribers := make([]*memorySubscriber, 0, len(m.batchsubscribers)-1) - for _, sb := range m.batchsubscribers[topic] { + newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1) + for _, sb := range m.subscribers[topic] { if sb.id == sub.id { continue } newSubscribers = append(newSubscribers, sb) } - m.batchsubscribers[topic] = newSubscribers + m.subscribers[topic] = newSubscribers m.Unlock() }() @@ -373,8 +362,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { // NewBroker return new memory broker func NewBroker(opts ...Option) BatchBroker { return &memoryBroker{ - opts: NewOptions(opts...), - subscribers: make(map[string][]*memorySubscriber), - batchsubscribers: make(map[string][]*memorySubscriber), + opts: NewOptions(opts...), + subscribers: make(map[string][]*memorySubscriber), } }