Merge pull request #583 from unistack-org/broker

broker memory: fix issue with publish/subscribe
This commit is contained in:
Asim Aslam 2019-07-13 00:16:28 +01:00 committed by GitHub
commit 2fecde1dbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -72,14 +72,14 @@ func (m *memoryBroker) Init(opts ...broker.Option) error {
} }
func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error { func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error {
m.Lock() m.RLock()
defer m.Unlock()
if !m.connected { if !m.connected {
m.RUnlock()
return errors.New("not connected") return errors.New("not connected")
} }
subs, ok := m.Subscribers[topic] subs, ok := m.Subscribers[topic]
m.RUnlock()
if !ok { if !ok {
return nil return nil
} }
@ -99,12 +99,12 @@ func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...br
} }
func (m *memoryBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (m *memoryBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
m.Lock() m.RLock()
defer m.Unlock()
if !m.connected { if !m.connected {
m.RUnlock()
return nil, errors.New("not connected") return nil, errors.New("not connected")
} }
m.RUnlock()
var options broker.SubscribeOptions var options broker.SubscribeOptions
for _, o := range opts { for _, o := range opts {
@ -119,7 +119,9 @@ func (m *memoryBroker) Subscribe(topic string, handler broker.Handler, opts ...b
opts: options, opts: options,
} }
m.Lock()
m.Subscribers[topic] = append(m.Subscribers[topic], sub) m.Subscribers[topic] = append(m.Subscribers[topic], sub)
m.Unlock()
go func() { go func() {
<-sub.exit <-sub.exit