WIP: broker batch processing

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-07-22 22:53:44 +03:00
parent e4673bcc50
commit d357fb1e0d
5 changed files with 461 additions and 26 deletions

View File

@@ -2,7 +2,6 @@ package broker
import (
"context"
"errors"
"sync"
"github.com/google/uuid"
@@ -13,9 +12,10 @@ import (
)
type memoryBroker struct {
Subscribers map[string][]*memorySubscriber
addr string
opts Options
subscribers map[string][]*memorySubscriber
batchsubscribers map[string][]*memoryBatchSubscriber
addr string
opts Options
sync.RWMutex
connected bool
}
@@ -36,6 +36,15 @@ type memorySubscriber struct {
opts SubscribeOptions
}
type memoryBatchSubscriber struct {
ctx context.Context
exit chan bool
handler BatchHandler
id string
topic string
opts SubscribeOptions
}
func (m *memoryBroker) Options() Options {
return m.opts
}
@@ -77,7 +86,6 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error {
}
m.connected = false
return nil
}
@@ -88,14 +96,127 @@ func (m *memoryBroker) Init(opts ...Option) error {
return nil
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
}
m.RUnlock()
type msgWrapper struct {
topic string
body interface{}
}
vs := make([]msgWrapper, 0, len(msgs))
if m.opts.Codec == nil {
m.RLock()
for _, msg := range msgs {
topic, _ := msg.Header.Get("Micro-Topic")
vs = append(vs, msgWrapper{topic: topic, body: m})
}
m.RUnlock()
} else {
m.RLock()
for _, msg := range msgs {
topic, _ := msg.Header.Get("Micro-Topic")
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
m.RUnlock()
return err
}
vs = append(vs, msgWrapper{topic: topic, body: buf})
}
m.RUnlock()
}
if len(m.batchsubscribers) > 0 {
eh := m.opts.BatchErrorHandler
msgTopicMap := make(map[string]Events)
for _, v := range vs {
p := &memoryEvent{
topic: v.topic,
message: v.body,
opts: m.opts,
}
msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p)
}
for t, ms := range msgTopicMap {
m.RLock()
subs, ok := m.batchsubscribers[t]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
if err := sub.handler(ms); err != nil {
ms.SetError(err)
if sub.opts.BatchErrorHandler != nil {
eh = sub.opts.BatchErrorHandler
}
if eh != nil {
eh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err := ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
}
}
eh := m.opts.ErrorHandler
for _, v := range vs {
p := &memoryEvent{
topic: v.topic,
message: v.body,
opts: m.opts,
}
m.RLock()
subs, ok := m.subscribers[p.topic]
m.RUnlock()
if !ok {
continue
}
for _, sub := range subs {
if err := sub.handler(p); err != nil {
p.SetError(err)
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
} else if sub.opts.AutoAck {
if err := p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
}
}
}
}
return nil
}
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return errors.New("not connected")
return ErrNotConnected
}
subs, ok := m.Subscribers[topic]
subs, ok := m.subscribers[topic]
m.RUnlock()
if !ok {
return nil
@@ -138,11 +259,58 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message,
return nil
}
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, ErrNotConnected
}
m.RUnlock()
options := NewSubscribeOptions(opts...)
id, err := uuid.NewRandom()
if err != nil {
return nil, err
}
sub := &memoryBatchSubscriber{
exit: make(chan bool, 1),
id: id.String(),
topic: topic,
handler: handler,
opts: options,
ctx: ctx,
}
m.Lock()
m.batchsubscribers[topic] = append(m.batchsubscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
var newSubscribers []*memoryBatchSubscriber
for _, sb := range m.batchsubscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.batchsubscribers[topic] = newSubscribers
m.Unlock()
}()
return sub, nil
return nil, nil
}
func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock()
if !m.connected {
m.RUnlock()
return nil, errors.New("not connected")
return nil, ErrNotConnected
}
m.RUnlock()
@@ -163,20 +331,20 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
}
m.Lock()
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
m.subscribers[topic] = append(m.subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit
m.Lock()
var newSubscribers []*memorySubscriber
for _, sb := range m.Subscribers[topic] {
for _, sb := range m.subscribers[topic] {
if sb.id == sub.id {
continue
}
newSubscribers = append(newSubscribers, sb)
}
m.Subscribers[topic] = newSubscribers
m.subscribers[topic] = newSubscribers
m.Unlock()
}()
@@ -221,6 +389,23 @@ func (m *memoryEvent) Error() error {
return m.err
}
func (m *memoryEvent) SetError(err error) {
m.err = err
}
func (m *memoryBatchSubscriber) Options() SubscribeOptions {
return m.opts
}
func (m *memoryBatchSubscriber) Topic() string {
return m.topic
}
func (m *memoryBatchSubscriber) Unsubscribe(ctx context.Context) error {
m.exit <- true
return nil
}
func (m *memorySubscriber) Options() SubscribeOptions {
return m.opts
}
@@ -237,7 +422,8 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error {
// NewBroker return new memory broker
func NewBroker(opts ...Option) Broker {
return &memoryBroker{
opts: NewOptions(opts...),
Subscribers: make(map[string][]*memorySubscriber),
opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber),
batchsubscribers: make(map[string][]*memoryBatchSubscriber),
}
}