From d357fb1e0d3cb91abc045cd5f14747e69e593bba Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 22 Jul 2021 22:53:44 +0300 Subject: [PATCH] WIP: broker batch processing Signed-off-by: Vasiliy Tolstov --- broker/broker.go | 71 +++++++++-- broker/memory.go | 212 ++++++++++++++++++++++++++++++-- broker/memory_test.go | 58 ++++++++- broker/options.go | 20 +++ network/tunnel/broker/broker.go | 126 ++++++++++++++++++- 5 files changed, 461 insertions(+), 26 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index fc1bde6a..25f8732b 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -8,31 +8,79 @@ import ( "github.com/unistack-org/micro/v3/metadata" ) -// DefaultBroker default broker +// DefaultBroker default memory broker var DefaultBroker Broker = NewBroker() +var ( + // ErrNotConnected returns when broker used but not connected yet + ErrNotConnected = errors.New("broker not connected") + // ErrDisconnected returns when broker disconnected + ErrDisconnected = errors.New("broker disconnected") +) + // Broker is an interface used for asynchronous messaging. type Broker interface { + // Name returns broker instance name Name() string - Init(...Option) error + // Init initilize broker + Init(opts ...Option) error + // Options returns broker options Options() Options + // Address return configured address Address() string - Connect(context.Context) error - Disconnect(context.Context) error - Publish(context.Context, string, *Message, ...PublishOption) error - Subscribe(context.Context, string, Handler, ...SubscribeOption) (Subscriber, error) + // Connect connects to broker + Connect(ctx context.Context) error + // Disconnect disconnect from broker + Disconnect(ctx context.Context) error + // Publish message to broker topic + Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error + // BatchPublish messages to broker with multiple topics + BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error + // Subscribe subscribes to topic message via handler + Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) + // BatchSubscribe subscribes to topic messages via handler + BatchSubscribe(ctx context.Context, topic string, h BatchHandler, opts ...SubscribeOption) (Subscriber, error) + // String type of broker String() string } // Handler is used to process messages via a subscription of a topic. type Handler func(Event) error +// Events contains multiple events +type Events []Event + +func (evs Events) Ack() error { + var err error + for _, ev := range evs { + if err = ev.Ack(); err != nil { + return err + } + } + return nil +} + +func (evs Events) SetError(err error) { + for _, ev := range evs { + ev.SetError(err) + } +} + +// BatchHandler is used to process messages in batches via a subscription of a topic. +type BatchHandler func(Events) error + // Event is given to a subscription handler for processing type Event interface { + // Topic returns event topic Topic() string + // Message returns broker message Message() *Message + // Ack acknowledge message Ack() error + // Error returns message error (like decoding errors or some other) Error() error + // SetError set event processing error + SetError(err error) } // RawMessage is a raw encoded JSON value. @@ -58,13 +106,18 @@ func (m *RawMessage) UnmarshalJSON(data []byte) error { // Message is used to transfer data type Message struct { - Header metadata.Metadata // contains message metadata - Body RawMessage // contains message body + // Header contains message metadata + Header metadata.Metadata + // Body contains message body + Body RawMessage } // Subscriber is a convenience return type for the Subscribe method type Subscriber interface { + // Options returns subscriber options Options() SubscribeOptions + // Topic returns topic for subscription Topic() string - Unsubscribe(context.Context) error + // Unsubscribe from topic + Unsubscribe(ctx context.Context) error } diff --git a/broker/memory.go b/broker/memory.go index ab231b57..93b66772 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -2,7 +2,6 @@ package broker import ( "context" - "errors" "sync" "github.com/google/uuid" @@ -13,9 +12,10 @@ import ( ) type memoryBroker struct { - Subscribers map[string][]*memorySubscriber - addr string - opts Options + subscribers map[string][]*memorySubscriber + batchsubscribers map[string][]*memoryBatchSubscriber + addr string + opts Options sync.RWMutex connected bool } @@ -36,6 +36,15 @@ type memorySubscriber struct { opts SubscribeOptions } +type memoryBatchSubscriber struct { + ctx context.Context + exit chan bool + handler BatchHandler + id string + topic string + opts SubscribeOptions +} + func (m *memoryBroker) Options() Options { return m.opts } @@ -77,7 +86,6 @@ func (m *memoryBroker) Disconnect(ctx context.Context) error { } m.connected = false - return nil } @@ -88,14 +96,127 @@ func (m *memoryBroker) Init(opts ...Option) error { return nil } +func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { + m.RLock() + if !m.connected { + m.RUnlock() + return ErrNotConnected + } + m.RUnlock() + + type msgWrapper struct { + topic string + body interface{} + } + + vs := make([]msgWrapper, 0, len(msgs)) + if m.opts.Codec == nil { + m.RLock() + for _, msg := range msgs { + topic, _ := msg.Header.Get("Micro-Topic") + vs = append(vs, msgWrapper{topic: topic, body: m}) + } + m.RUnlock() + } else { + m.RLock() + for _, msg := range msgs { + topic, _ := msg.Header.Get("Micro-Topic") + buf, err := m.opts.Codec.Marshal(msg) + if err != nil { + m.RUnlock() + return err + } + vs = append(vs, msgWrapper{topic: topic, body: buf}) + } + m.RUnlock() + } + + if len(m.batchsubscribers) > 0 { + eh := m.opts.BatchErrorHandler + + msgTopicMap := make(map[string]Events) + for _, v := range vs { + p := &memoryEvent{ + topic: v.topic, + message: v.body, + opts: m.opts, + } + msgTopicMap[p.topic] = append(msgTopicMap[p.topic], p) + } + + for t, ms := range msgTopicMap { + m.RLock() + subs, ok := m.batchsubscribers[t] + m.RUnlock() + if !ok { + continue + } + for _, sub := range subs { + if err := sub.handler(ms); err != nil { + ms.SetError(err) + if sub.opts.BatchErrorHandler != nil { + eh = sub.opts.BatchErrorHandler + } + if eh != nil { + eh(ms) + } else if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, err.Error()) + } + } else if sub.opts.AutoAck { + if err := ms.Ack(); err != nil { + m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } + } + } + } + + } + + eh := m.opts.ErrorHandler + + for _, v := range vs { + p := &memoryEvent{ + topic: v.topic, + message: v.body, + opts: m.opts, + } + + m.RLock() + subs, ok := m.subscribers[p.topic] + m.RUnlock() + if !ok { + continue + } + for _, sub := range subs { + if err := sub.handler(p); err != nil { + p.SetError(err) + if sub.opts.ErrorHandler != nil { + eh = sub.opts.ErrorHandler + } + if eh != nil { + eh(p) + } else if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, err.Error()) + } + } else if sub.opts.AutoAck { + if err := p.Ack(); err != nil { + m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } + } + } + } + + return nil +} + func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { m.RLock() if !m.connected { m.RUnlock() - return errors.New("not connected") + return ErrNotConnected } - subs, ok := m.Subscribers[topic] + subs, ok := m.subscribers[topic] m.RUnlock() if !ok { return nil @@ -138,11 +259,58 @@ func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, return nil } +func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { + m.RLock() + if !m.connected { + m.RUnlock() + return nil, ErrNotConnected + } + m.RUnlock() + + options := NewSubscribeOptions(opts...) + + id, err := uuid.NewRandom() + if err != nil { + return nil, err + } + + sub := &memoryBatchSubscriber{ + exit: make(chan bool, 1), + id: id.String(), + topic: topic, + handler: handler, + opts: options, + ctx: ctx, + } + + m.Lock() + m.batchsubscribers[topic] = append(m.batchsubscribers[topic], sub) + m.Unlock() + + go func() { + <-sub.exit + m.Lock() + var newSubscribers []*memoryBatchSubscriber + for _, sb := range m.batchsubscribers[topic] { + if sb.id == sub.id { + continue + } + newSubscribers = append(newSubscribers, sb) + } + m.batchsubscribers[topic] = newSubscribers + m.Unlock() + }() + + return sub, nil + + return nil, nil +} + func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { m.RLock() if !m.connected { m.RUnlock() - return nil, errors.New("not connected") + return nil, ErrNotConnected } m.RUnlock() @@ -163,20 +331,20 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand } m.Lock() - m.Subscribers[topic] = append(m.Subscribers[topic], sub) + m.subscribers[topic] = append(m.subscribers[topic], sub) m.Unlock() go func() { <-sub.exit m.Lock() var newSubscribers []*memorySubscriber - for _, sb := range m.Subscribers[topic] { + for _, sb := range m.subscribers[topic] { if sb.id == sub.id { continue } newSubscribers = append(newSubscribers, sb) } - m.Subscribers[topic] = newSubscribers + m.subscribers[topic] = newSubscribers m.Unlock() }() @@ -221,6 +389,23 @@ func (m *memoryEvent) Error() error { return m.err } +func (m *memoryEvent) SetError(err error) { + m.err = err +} + +func (m *memoryBatchSubscriber) Options() SubscribeOptions { + return m.opts +} + +func (m *memoryBatchSubscriber) Topic() string { + return m.topic +} + +func (m *memoryBatchSubscriber) Unsubscribe(ctx context.Context) error { + m.exit <- true + return nil +} + func (m *memorySubscriber) Options() SubscribeOptions { return m.opts } @@ -237,7 +422,8 @@ func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { // NewBroker return new memory broker func NewBroker(opts ...Option) Broker { return &memoryBroker{ - opts: NewOptions(opts...), - Subscribers: make(map[string][]*memorySubscriber), + opts: NewOptions(opts...), + subscribers: make(map[string][]*memorySubscriber), + batchsubscribers: make(map[string][]*memoryBatchSubscriber), } } diff --git a/broker/memory_test.go b/broker/memory_test.go index 168c8d16..27bba388 100644 --- a/broker/memory_test.go +++ b/broker/memory_test.go @@ -6,6 +6,51 @@ import ( "testing" ) +func TestMemoryBatchBroker(t *testing.T) { + b := NewBroker() + ctx := context.Background() + + if err := b.Connect(ctx); err != nil { + t.Fatalf("Unexpected connect error %v", err) + } + + topic := "test" + count := 10 + + fn := func(evts Events) error { + return evts.Ack() + } + + sub, err := b.BatchSubscribe(ctx, topic, fn) + if err != nil { + t.Fatalf("Unexpected error subscribing %v", err) + } + + msgs := make([]*Message, 0, 0) + for i := 0; i < count; i++ { + message := &Message{ + Header: map[string]string{ + "Micro-Topic": topic, + "foo": "bar", + "id": fmt.Sprintf("%d", i), + }, + Body: []byte(`"hello world"`), + } + msgs = append(msgs, message) + } + + if err := b.BatchPublish(ctx, msgs); err != nil { + t.Fatalf("Unexpected error publishing %v", err) + } + + if err := sub.Unsubscribe(ctx); err != nil { + t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err) + } + + if err := b.Disconnect(ctx); err != nil { + t.Fatalf("Unexpected connect error %v", err) + } +} func TestMemoryBroker(t *testing.T) { b := NewBroker() ctx := context.Background() @@ -26,20 +71,27 @@ func TestMemoryBroker(t *testing.T) { t.Fatalf("Unexpected error subscribing %v", err) } + msgs := make([]*Message, 0, 0) for i := 0; i < count; i++ { message := &Message{ Header: map[string]string{ - "foo": "bar", - "id": fmt.Sprintf("%d", i), + "Micro-Topic": topic, + "foo": "bar", + "id": fmt.Sprintf("%d", i), }, Body: []byte(`"hello world"`), } + msgs = append(msgs, message) if err := b.Publish(ctx, topic, message); err != nil { - t.Fatalf("Unexpected error publishing %d", i) + t.Fatalf("Unexpected error publishing %d err: %v", i, err) } } + if err := b.BatchPublish(ctx, msgs); err != nil { + t.Fatalf("Unexpected error publishing %v", err) + } + if err := sub.Unsubscribe(ctx); err != nil { t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err) } diff --git a/broker/options.go b/broker/options.go index 26cb637a..d081adbe 100644 --- a/broker/options.go +++ b/broker/options.go @@ -29,6 +29,8 @@ type Options struct { TLSConfig *tls.Config // ErrorHandler used when broker can't unmarshal incoming message ErrorHandler Handler + // BatchErrorHandler used when broker can't unmashal incoming messages + BatchErrorHandler BatchHandler // Name holds the broker name Name string // Addrs holds the broker address @@ -85,6 +87,8 @@ type SubscribeOptions struct { Context context.Context // ErrorHandler used when broker can't unmarshal incoming message ErrorHandler Handler + // BatchErrorHandler used when broker can't unmashal incoming messages + BatchErrorHandler BatchHandler // Group holds consumer group Group string // AutoAck flag specifies auto ack of incoming message when no error happens @@ -175,6 +179,14 @@ func ErrorHandler(h Handler) Option { } } +// BatchErrorHandler will catch all broker errors that cant be handled +// in normal way, for example Codec errors +func BatchErrorHandler(h BatchHandler) Option { + return func(o *Options) { + o.BatchErrorHandler = h + } +} + // SubscribeErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors func SubscribeErrorHandler(h Handler) SubscribeOption { @@ -183,6 +195,14 @@ func SubscribeErrorHandler(h Handler) SubscribeOption { } } +// SubscribeBatchErrorHandler will catch all broker errors that cant be handled +// in normal way, for example Codec errors +func SubscribeBatchErrorHandler(h BatchHandler) SubscribeOption { + return func(o *SubscribeOptions) { + o.BatchErrorHandler = h + } +} + // Queue sets the subscribers queue // Deprecated func Queue(name string) SubscribeOption { diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index 460b9e77..1e9732e2 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -24,9 +24,18 @@ type tunSubscriber struct { opts broker.SubscribeOptions } +type tunBatchSubscriber struct { + listener tunnel.Listener + handler broker.BatchHandler + closed chan bool + topic string + opts broker.SubscribeOptions +} + type tunEvent struct { message *broker.Message topic string + err error } // used to access tunnel from options context @@ -62,6 +71,36 @@ func (t *tunBroker) Disconnect(ctx context.Context) error { return t.tunnel.Close(ctx) } +func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + // TODO: this is probably inefficient, we might want to just maintain an open connection + // it may be easier to add broadcast to the tunnel + topicMap := make(map[string]tunnel.Session) + + var err error + for _, msg := range msgs { + topic, _ := msg.Header.Get("Micro-Topic") + c, ok := topicMap[topic] + if !ok { + c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast)) + if err != nil { + return err + } + defer c.Close() + topicMap[topic] = c + } + + if err = c.Send(&transport.Message{ + Header: msg.Header, + Body: msg.Body, + }); err != nil { + // msg.SetError(err) + return err + } + } + + return nil +} + func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error { // TODO: this is probably inefficient, we might want to just maintain an open connection // it may be easier to add broadcast to the tunnel @@ -77,6 +116,26 @@ func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message }) } +func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast)) + if err != nil { + return nil, err + } + + tunSub := &tunBatchSubscriber{ + topic: topic, + handler: h, + opts: broker.NewSubscribeOptions(opts...), + closed: make(chan bool), + listener: l, + } + + // start processing + go tunSub.run() + + return tunSub, nil +} + func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast)) if err != nil { @@ -101,6 +160,49 @@ func (t *tunBroker) String() string { return "tunnel" } +func (t *tunBatchSubscriber) run() { + for { + // accept a new connection + c, err := t.listener.Accept() + if err != nil { + select { + case <-t.closed: + return + default: + continue + } + } + + // receive message + m := new(transport.Message) + if err := c.Recv(m); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Error(t.opts.Context, err.Error()) + } + if err = c.Close(); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Error(t.opts.Context, err.Error()) + } + } + continue + } + + // close the connection + c.Close() + + evts := broker.Events{&tunEvent{ + topic: t.topic, + message: &broker.Message{ + Header: m.Header, + Body: m.Body, + }, + }} + // handle the message + go t.handler(evts) + + } +} + func (t *tunSubscriber) run() { for { // accept a new connection @@ -142,6 +244,24 @@ func (t *tunSubscriber) run() { } } +func (t *tunBatchSubscriber) Options() broker.SubscribeOptions { + return t.opts +} + +func (t *tunBatchSubscriber) Topic() string { + return t.topic +} + +func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error { + select { + case <-t.closed: + return nil + default: + close(t.closed) + return t.listener.Close() + } +} + func (t *tunSubscriber) Options() broker.SubscribeOptions { return t.opts } @@ -173,7 +293,11 @@ func (t *tunEvent) Ack() error { } func (t *tunEvent) Error() error { - return nil + return t.err +} + +func (t *tunEvent) SetError(err error) { + t.err = err } // NewBroker returns new tunnel broker