stan: fix data race in Connect()
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
7d84f2005b
commit
44de02e879
10
stan.go
10
stan.go
@ -129,15 +129,20 @@ func (n *stanBroker) reconnectCB(c stan.Conn, err error) {
|
|||||||
func (n *stanBroker) connect() error {
|
func (n *stanBroker) connect() error {
|
||||||
timeout := make(<-chan time.Time)
|
timeout := make(<-chan time.Time)
|
||||||
|
|
||||||
|
n.RLock()
|
||||||
if n.connectTimeout > 0 {
|
if n.connectTimeout > 0 {
|
||||||
timeout = time.After(n.connectTimeout)
|
timeout = time.After(n.connectTimeout)
|
||||||
}
|
}
|
||||||
|
clusterID := n.clusterID
|
||||||
|
clientID := n.clientID
|
||||||
|
nopts := n.nopts
|
||||||
|
n.RUnlock()
|
||||||
|
|
||||||
ticker := time.NewTicker(1 * time.Second)
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
fn := func() error {
|
fn := func() error {
|
||||||
c, err := stan.Connect(n.clusterID, n.clientID, n.nopts...)
|
c, err := stan.Connect(clusterID, clientID, nopts...)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
n.Lock()
|
n.Lock()
|
||||||
n.conn = c
|
n.conn = c
|
||||||
@ -199,6 +204,7 @@ func (n *stanBroker) Connect() error {
|
|||||||
clientID = uuid.New().String()
|
clientID = uuid.New().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
n.Lock()
|
||||||
if v, ok := n.opts.Context.Value(connectRetryKey{}).(bool); ok && v {
|
if v, ok := n.opts.Context.Value(connectRetryKey{}).(bool); ok && v {
|
||||||
n.connectRetry = true
|
n.connectRetry = true
|
||||||
}
|
}
|
||||||
@ -208,6 +214,7 @@ func (n *stanBroker) Connect() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if n.sopts.ConnectionLostCB != nil && n.connectRetry {
|
if n.sopts.ConnectionLostCB != nil && n.connectRetry {
|
||||||
|
n.Unlock()
|
||||||
return errors.New("impossible to use custom ConnectionLostCB and ConnectRetry(true)")
|
return errors.New("impossible to use custom ConnectionLostCB and ConnectRetry(true)")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,7 +233,6 @@ func (n *stanBroker) Connect() error {
|
|||||||
}
|
}
|
||||||
nopts = append(nopts, stan.NatsURL(strings.Join(n.addrs, ",")))
|
nopts = append(nopts, stan.NatsURL(strings.Join(n.addrs, ",")))
|
||||||
|
|
||||||
n.Lock()
|
|
||||||
n.nopts = nopts
|
n.nopts = nopts
|
||||||
n.clusterID = clusterID
|
n.clusterID = clusterID
|
||||||
n.clientID = clientID
|
n.clientID = clientID
|
||||||
|
Loading…
Reference in New Issue
Block a user