fix race condition in nats broker

This commit is contained in:
Asim Aslam 2018-11-05 06:39:28 +00:00 committed by Vasiliy Tolstov
parent a3fcb74f93
commit 4776921d4d

13
nats.go
View File

@ -4,6 +4,7 @@ package nats
import (
"context"
"strings"
"sync"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/broker/codec/json"
@ -12,6 +13,7 @@ import (
)
type nbroker struct {
sync.RWMutex
addrs []string
conn *nats.Conn
opts broker.Options
@ -85,9 +87,12 @@ func setAddrs(addrs []string) []string {
}
func (n *nbroker) Connect() error {
n.RLock()
if n.conn != nil && n.conn.IsConnected() {
n.RUnlock()
return nil
}
n.RUnlock()
opts := n.nopts
opts.Servers = n.addrs
@ -103,12 +108,16 @@ func (n *nbroker) Connect() error {
if err != nil {
return err
}
n.Lock()
n.conn = c
n.Unlock()
return nil
}
func (n *nbroker) Disconnect() error {
n.RLock()
n.conn.Close()
n.RUnlock()
return nil
}
@ -129,6 +138,8 @@ func (n *nbroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ
if err != nil {
return err
}
n.RLock()
defer n.RUnlock()
return n.conn.Publish(topic, b)
}
@ -152,11 +163,13 @@ func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker
var sub *nats.Subscription
var err error
n.RLock()
if len(opt.Queue) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
n.RUnlock()
if err != nil {
return nil, err
}