diff --git a/broker/memory.go b/broker/memory.go index 15e0c907..2d72be0a 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -14,7 +14,7 @@ import ( type memoryBroker struct { subscribers map[string][]*memorySubscriber - batchsubscribers map[string][]*memoryBatchSubscriber + batchsubscribers map[string][]*memorySubscriber addr string opts Options sync.RWMutex @@ -29,21 +29,13 @@ type memoryEvent struct { } type memorySubscriber struct { - ctx context.Context - exit chan bool - handler Handler - id string - topic string - opts SubscribeOptions -} - -type memoryBatchSubscriber struct { - ctx context.Context - exit chan bool - handler BatchHandler - id string - topic string - opts SubscribeOptions + ctx context.Context + exit chan bool + handler Handler + batchhandler BatchHandler + id string + topic string + opts SubscribeOptions } func (m *memoryBroker) Options() Options { @@ -97,6 +89,35 @@ func (m *memoryBroker) Init(opts ...Option) error { return nil } +func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { + m.RLock() + if !m.connected { + m.RUnlock() + return ErrNotConnected + } + m.RUnlock() + + vs := make([]msgWrapper, 0, 1) + if m.opts.Codec == nil { + topic, _ := msg.Header.Get(metadata.HeaderTopic) + vs = append(vs, msgWrapper{topic: topic, body: msg}) + } else { + topic, _ := msg.Header.Get(metadata.HeaderTopic) + buf, err := m.opts.Codec.Marshal(msg) + if err != nil { + return err + } + vs = append(vs, msgWrapper{topic: topic, body: buf}) + } + + return m.publish(ctx, vs, opts...) +} + +type msgWrapper struct { + topic string + body interface{} +} + func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { m.RLock() if !m.connected { @@ -105,33 +126,27 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts . } m.RUnlock() - type msgWrapper struct { - topic string - body interface{} - } - vs := make([]msgWrapper, 0, len(msgs)) if m.opts.Codec == nil { - m.RLock() for _, msg := range msgs { topic, _ := msg.Header.Get(metadata.HeaderTopic) - vs = append(vs, msgWrapper{topic: topic, body: m}) + vs = append(vs, msgWrapper{topic: topic, body: msg}) } - m.RUnlock() } else { - m.RLock() for _, msg := range msgs { topic, _ := msg.Header.Get(metadata.HeaderTopic) buf, err := m.opts.Codec.Marshal(msg) if err != nil { - m.RUnlock() return err } vs = append(vs, msgWrapper{topic: topic, body: buf}) } - m.RUnlock() } + return m.publish(ctx, vs, opts...) +} + +func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error { if len(m.batchsubscribers) > 0 { eh := m.opts.BatchErrorHandler @@ -153,7 +168,7 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts . continue } for _, sub := range subs { - if err := sub.handler(ms); err != nil { + if err := sub.batchhandler(ms); err != nil { ms.SetError(err) if sub.opts.BatchErrorHandler != nil { eh = sub.opts.BatchErrorHandler @@ -210,56 +225,6 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts . return nil } -func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { - m.RLock() - if !m.connected { - m.RUnlock() - return ErrNotConnected - } - - subs, ok := m.subscribers[topic] - m.RUnlock() - if !ok { - return nil - } - - var v interface{} - if m.opts.Codec != nil { - buf, err := m.opts.Codec.Marshal(msg) - if err != nil { - return err - } - v = buf - } else { - v = msg - } - - p := &memoryEvent{ - topic: topic, - message: v, - opts: m.opts, - } - - eh := m.opts.ErrorHandler - - for _, sub := range subs { - if err := sub.handler(p); err != nil { - p.err = err - if sub.opts.ErrorHandler != nil { - eh = sub.opts.ErrorHandler - } - if eh != nil { - eh(p) - } else if m.opts.Logger.V(logger.ErrorLevel) { - m.opts.Logger.Error(m.opts.Context, err.Error()) - } - continue - } - } - - return nil -} - func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { m.RLock() if !m.connected { @@ -268,20 +233,20 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler } m.RUnlock() - options := NewSubscribeOptions(opts...) - id, err := uuid.NewRandom() if err != nil { return nil, err } - sub := &memoryBatchSubscriber{ - exit: make(chan bool, 1), - id: id.String(), - topic: topic, - handler: handler, - opts: options, - ctx: ctx, + options := NewSubscribeOptions(opts...) + + sub := &memorySubscriber{ + exit: make(chan bool, 1), + id: id.String(), + topic: topic, + batchhandler: handler, + opts: options, + ctx: ctx, } m.Lock() @@ -291,7 +256,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler go func() { <-sub.exit m.Lock() - var newSubscribers []*memoryBatchSubscriber + newSubscribers := make([]*memorySubscriber, 0, len(m.batchsubscribers)-1) for _, sb := range m.batchsubscribers[topic] { if sb.id == sub.id { continue @@ -313,13 +278,13 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand } m.RUnlock() - options := NewSubscribeOptions(opts...) - id, err := uuid.NewRandom() if err != nil { return nil, err } + options := NewSubscribeOptions(opts...) + sub := &memorySubscriber{ exit: make(chan bool, 1), id: id.String(), @@ -336,7 +301,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand go func() { <-sub.exit m.Lock() - var newSubscribers []*memorySubscriber + newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1) for _, sb := range m.subscribers[topic] { if sb.id == sub.id { continue @@ -392,19 +357,6 @@ func (m *memoryEvent) SetError(err error) { m.err = err } -func (m *memoryBatchSubscriber) Options() SubscribeOptions { - return m.opts -} - -func (m *memoryBatchSubscriber) Topic() string { - return m.topic -} - -func (m *memoryBatchSubscriber) Unsubscribe(ctx context.Context) error { - m.exit <- true - return nil -} - func (m *memorySubscriber) Options() SubscribeOptions { return m.opts } @@ -423,6 +375,6 @@ func NewBroker(opts ...Option) BatchBroker { return &memoryBroker{ opts: NewOptions(opts...), subscribers: make(map[string][]*memorySubscriber), - batchsubscribers: make(map[string][]*memoryBatchSubscriber), + batchsubscribers: make(map[string][]*memorySubscriber), } }