changed embedded mutex to private field (#217)
This commit is contained in:
@@ -22,8 +22,8 @@ type Broker struct {
|
||||
subscribers map[string][]*Subscriber
|
||||
addr string
|
||||
opts broker.Options
|
||||
sync.RWMutex
|
||||
connected bool
|
||||
mu sync.RWMutex
|
||||
connected bool
|
||||
}
|
||||
|
||||
type memoryMessage struct {
|
||||
@@ -72,9 +72,9 @@ func (b *Broker) newCodec(ct string) (codec.Codec, error) {
|
||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||
ct = ct[:idx]
|
||||
}
|
||||
b.RLock()
|
||||
b.mu.RLock()
|
||||
c, ok := b.opts.Codecs[ct]
|
||||
b.RUnlock()
|
||||
b.mu.RUnlock()
|
||||
if ok {
|
||||
return c, nil
|
||||
}
|
||||
@@ -96,8 +96,8 @@ func (b *Broker) Connect(ctx context.Context) error {
|
||||
default:
|
||||
}
|
||||
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if b.connected {
|
||||
return nil
|
||||
@@ -126,8 +126,8 @@ func (b *Broker) Disconnect(ctx context.Context) error {
|
||||
default:
|
||||
}
|
||||
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if !b.connected {
|
||||
return nil
|
||||
@@ -183,12 +183,12 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker
|
||||
}
|
||||
|
||||
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
||||
b.RLock()
|
||||
b.mu.RLock()
|
||||
if !b.connected {
|
||||
b.RUnlock()
|
||||
b.mu.RUnlock()
|
||||
return broker.ErrNotConnected
|
||||
}
|
||||
b.RUnlock()
|
||||
b.mu.RUnlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -196,9 +196,9 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
||||
default:
|
||||
}
|
||||
|
||||
b.RLock()
|
||||
b.mu.RLock()
|
||||
subs, ok := b.subscribers[topic]
|
||||
b.RUnlock()
|
||||
b.mu.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
@@ -255,12 +255,12 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.RLock()
|
||||
b.mu.RLock()
|
||||
if !b.connected {
|
||||
b.RUnlock()
|
||||
b.mu.RUnlock()
|
||||
return nil, broker.ErrNotConnected
|
||||
}
|
||||
b.RUnlock()
|
||||
b.mu.RUnlock()
|
||||
|
||||
sid, err := id.New()
|
||||
if err != nil {
|
||||
@@ -278,13 +278,13 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
b.Lock()
|
||||
b.mu.Lock()
|
||||
b.subscribers[topic] = append(b.subscribers[topic], sub)
|
||||
b.Unlock()
|
||||
b.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
<-sub.exit
|
||||
b.Lock()
|
||||
b.mu.Lock()
|
||||
newSubscribers := make([]*Subscriber, 0, len(b.subscribers)-1)
|
||||
for _, sb := range b.subscribers[topic] {
|
||||
if sb.id == sub.id {
|
||||
@@ -293,7 +293,7 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
|
||||
newSubscribers = append(newSubscribers, sb)
|
||||
}
|
||||
b.subscribers[topic] = newSubscribers
|
||||
b.Unlock()
|
||||
b.mu.Unlock()
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
|
Reference in New Issue
Block a user