broker/memory: optimize

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-23 15:12:20 +03:00
parent 419cd486cf
commit 80e3d239ab

View File

@ -13,10 +13,9 @@ import (
) )
type memoryBroker struct { type memoryBroker struct {
subscribers map[string][]*memorySubscriber subscribers map[string][]*memorySubscriber
batchsubscribers map[string][]*memorySubscriber addr string
addr string opts Options
opts Options
sync.RWMutex sync.RWMutex
connected bool connected bool
} }
@ -147,76 +146,66 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts .
} }
func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error { func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error {
if len(m.batchsubscribers) > 0 { var err error
eh := m.opts.BatchErrorHandler
msgTopicMap := make(map[string]Events)
for _, v := range vs {
p := &memoryEvent{
topic: v.topic,
message: v.body,
opts: m.opts,
}
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
}
for t, ms := range msgTopicMap {
m.RLock()
subs, ok := m.batchsubscribers[t]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
if err := sub.batchhandler(ms); err != nil {
ms.SetError(err)
if sub.opts.BatchErrorHandler != nil {
eh = sub.opts.BatchErrorHandler
}
if eh != nil {
eh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err := ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
}
}
eh := m.opts.ErrorHandler
msgTopicMap := make(map[string]Events)
for _, v := range vs { for _, v := range vs {
p := &memoryEvent{ p := &memoryEvent{
topic: v.topic, topic: v.topic,
message: v.body, message: v.body,
opts: m.opts, opts: m.opts,
} }
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
}
beh := m.opts.BatchErrorHandler
eh := m.opts.ErrorHandler
for t, ms := range msgTopicMap {
m.RLock() m.RLock()
subs, ok := m.subscribers[p.topic] subs, ok := m.subscribers[t]
m.RUnlock() m.RUnlock()
if !ok { if !ok {
continue continue
} }
for _, sub := range subs { for _, sub := range subs {
if err := sub.handler(p); err != nil { // batch processing
p.SetError(err) if sub.batchhandler != nil {
if sub.opts.ErrorHandler != nil { if err = sub.batchhandler(ms); err != nil {
eh = sub.opts.ErrorHandler ms.SetError(err)
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if beh != nil {
beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
} }
if eh != nil { }
eh(p) // single processing
} else if m.opts.Logger.V(logger.ErrorLevel) { if sub.handler != nil {
m.opts.Logger.Error(m.opts.Context, err.Error()) for _, p := range ms {
} if err = sub.handler(p); err != nil {
} else if sub.opts.AutoAck { p.SetError(err)
if err := p.Ack(); err != nil { if sub.opts.ErrorHandler != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
} }
} }
} }
@ -250,20 +239,20 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
} }
m.Lock() m.Lock()
m.batchsubscribers[topic] = append(m.batchsubscribers[topic], sub) m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock() m.Unlock()
go func() { go func() {
<-sub.exit <-sub.exit
m.Lock() m.Lock()
newSubscribers := make([]*memorySubscriber, 0, len(m.batchsubscribers)-1) newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.batchsubscribers[topic] { for _, sb := range m.subscribers[topic] {
if sb.id == sub.id { if sb.id == sub.id {
continue continue
} }
newSubscribers = append(newSubscribers, sb) newSubscribers = append(newSubscribers, sb)
} }
m.batchsubscribers[topic] = newSubscribers m.subscribers[topic] = newSubscribers
m.Unlock() m.Unlock()
}() }()
@ -373,8 +362,7 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
// NewBroker return new memory broker // NewBroker return new memory broker
func NewBroker(opts ...Option) BatchBroker { func NewBroker(opts ...Option) BatchBroker {
return &memoryBroker{ return &memoryBroker{
opts: NewOptions(opts...), opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber), subscribers: make(map[string][]*memorySubscriber),
batchsubscribers: make(map[string][]*memorySubscriber),
} }
} }