broker/memory: cleanup

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-07-23 15:06:10 +03:00
parent e64269b2a8
commit 419cd486cf

View File

@ -14,7 +14,7 @@ import (
type memoryBroker struct { type memoryBroker struct {
subscribers map[string][]*memorySubscriber subscribers map[string][]*memorySubscriber
batchsubscribers map[string][]*memoryBatchSubscriber batchsubscribers map[string][]*memorySubscriber
addr string addr string
opts Options opts Options
sync.RWMutex sync.RWMutex
@ -29,21 +29,13 @@ type memoryEvent struct {
} }
type memorySubscriber struct { type memorySubscriber struct {
ctx context.Context ctx context.Context
exit chan bool exit chan bool
handler Handler handler Handler
id string batchhandler BatchHandler
topic string id string
opts SubscribeOptions topic string
} opts SubscribeOptions
type memoryBatchSubscriber struct {
ctx context.Context
exit chan bool
handler BatchHandler
id string
topic string
opts SubscribeOptions
} }
func (m *memoryBroker) Options() Options { func (m *memoryBroker) Options() Options {
@ -97,6 +89,35 @@ func (m *memoryBroker) Init(opts ...Option) error {
return nil return nil
} }
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
}
m.RUnlock()
vs := make([]msgWrapper, 0, 1)
if m.opts.Codec == nil {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: msg})
} else {
topic, _ := msg.Header.Get(metadata.HeaderTopic)
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
vs = append(vs, msgWrapper{topic: topic, body: buf})
}
return m.publish(ctx, vs, opts...)
}
type msgWrapper struct {
topic string
body interface{}
}
func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error { func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts ...PublishOption) error {
m.RLock() m.RLock()
if !m.connected { if !m.connected {
@ -105,33 +126,27 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts .
} }
m.RUnlock() m.RUnlock()
type msgWrapper struct {
topic string
body interface{}
}
vs := make([]msgWrapper, 0, len(msgs)) vs := make([]msgWrapper, 0, len(msgs))
if m.opts.Codec == nil { if m.opts.Codec == nil {
m.RLock()
for _, msg := range msgs { for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic) topic, _ := msg.Header.Get(metadata.HeaderTopic)
vs = append(vs, msgWrapper{topic: topic, body: m}) vs = append(vs, msgWrapper{topic: topic, body: msg})
} }
m.RUnlock()
} else { } else {
m.RLock()
for _, msg := range msgs { for _, msg := range msgs {
topic, _ := msg.Header.Get(metadata.HeaderTopic) topic, _ := msg.Header.Get(metadata.HeaderTopic)
buf, err := m.opts.Codec.Marshal(msg) buf, err := m.opts.Codec.Marshal(msg)
if err != nil { if err != nil {
m.RUnlock()
return err return err
} }
vs = append(vs, msgWrapper{topic: topic, body: buf}) vs = append(vs, msgWrapper{topic: topic, body: buf})
} }
m.RUnlock()
} }
return m.publish(ctx, vs, opts...)
}
func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...PublishOption) error {
if len(m.batchsubscribers) > 0 { if len(m.batchsubscribers) > 0 {
eh := m.opts.BatchErrorHandler eh := m.opts.BatchErrorHandler
@ -153,7 +168,7 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts .
continue continue
} }
for _, sub := range subs { for _, sub := range subs {
if err := sub.handler(ms); err != nil { if err := sub.batchhandler(ms); err != nil {
ms.SetError(err) ms.SetError(err)
if sub.opts.BatchErrorHandler != nil { if sub.opts.BatchErrorHandler != nil {
eh = sub.opts.BatchErrorHandler eh = sub.opts.BatchErrorHandler
@ -210,56 +225,6 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts .
return nil return nil
} }
func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error {
m.RLock()
if !m.connected {
m.RUnlock()
return ErrNotConnected
}
subs, ok := m.subscribers[topic]
m.RUnlock()
if !ok {
return nil
}
var v interface{}
if m.opts.Codec != nil {
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
v = buf
} else {
v = msg
}
p := &memoryEvent{
topic: topic,
message: v,
opts: m.opts,
}
eh := m.opts.ErrorHandler
for _, sub := range subs {
if err := sub.handler(p); err != nil {
p.err = err
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
if eh != nil {
eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error())
}
continue
}
}
return nil
}
func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) { func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler BatchHandler, opts ...SubscribeOption) (Subscriber, error) {
m.RLock() m.RLock()
if !m.connected { if !m.connected {
@ -268,20 +233,20 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
} }
m.RUnlock() m.RUnlock()
options := NewSubscribeOptions(opts...)
id, err := uuid.NewRandom() id, err := uuid.NewRandom()
if err != nil { if err != nil {
return nil, err return nil, err
} }
sub := &memoryBatchSubscriber{ options := NewSubscribeOptions(opts...)
exit: make(chan bool, 1),
id: id.String(), sub := &memorySubscriber{
topic: topic, exit: make(chan bool, 1),
handler: handler, id: id.String(),
opts: options, topic: topic,
ctx: ctx, batchhandler: handler,
opts: options,
ctx: ctx,
} }
m.Lock() m.Lock()
@ -291,7 +256,7 @@ func (m *memoryBroker) BatchSubscribe(ctx context.Context, topic string, handler
go func() { go func() {
<-sub.exit <-sub.exit
m.Lock() m.Lock()
var newSubscribers []*memoryBatchSubscriber newSubscribers := make([]*memorySubscriber, 0, len(m.batchsubscribers)-1)
for _, sb := range m.batchsubscribers[topic] { for _, sb := range m.batchsubscribers[topic] {
if sb.id == sub.id { if sb.id == sub.id {
continue continue
@ -313,13 +278,13 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
} }
m.RUnlock() m.RUnlock()
options := NewSubscribeOptions(opts...)
id, err := uuid.NewRandom() id, err := uuid.NewRandom()
if err != nil { if err != nil {
return nil, err return nil, err
} }
options := NewSubscribeOptions(opts...)
sub := &memorySubscriber{ sub := &memorySubscriber{
exit: make(chan bool, 1), exit: make(chan bool, 1),
id: id.String(), id: id.String(),
@ -336,7 +301,7 @@ func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Hand
go func() { go func() {
<-sub.exit <-sub.exit
m.Lock() m.Lock()
var newSubscribers []*memorySubscriber newSubscribers := make([]*memorySubscriber, 0, len(m.subscribers)-1)
for _, sb := range m.subscribers[topic] { for _, sb := range m.subscribers[topic] {
if sb.id == sub.id { if sb.id == sub.id {
continue continue
@ -392,19 +357,6 @@ func (m *memoryEvent) SetError(err error) {
m.err = err m.err = err
} }
func (m *memoryBatchSubscriber) Options() SubscribeOptions {
return m.opts
}
func (m *memoryBatchSubscriber) Topic() string {
return m.topic
}
func (m *memoryBatchSubscriber) Unsubscribe(ctx context.Context) error {
m.exit <- true
return nil
}
func (m *memorySubscriber) Options() SubscribeOptions { func (m *memorySubscriber) Options() SubscribeOptions {
return m.opts return m.opts
} }
@ -423,6 +375,6 @@ func NewBroker(opts ...Option) BatchBroker {
return &memoryBroker{ return &memoryBroker{
opts: NewOptions(opts...), opts: NewOptions(opts...),
subscribers: make(map[string][]*memorySubscriber), subscribers: make(map[string][]*memorySubscriber),
batchsubscribers: make(map[string][]*memoryBatchSubscriber), batchsubscribers: make(map[string][]*memorySubscriber),
} }
} }