update workflows

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2022-03-05 19:07:59 +03:00
parent 3bf96a1519
commit d5f177cc4e
7 changed files with 71 additions and 22 deletions

22
nats.go
View File

@@ -109,13 +109,15 @@ func (n *natsBroker) setAddrs(addrs []string) []string {
}
func (n *natsBroker) Connect(ctx context.Context) error {
n.Lock()
defer n.Unlock()
n.RLock()
if n.connected {
n.RUnlock()
return nil
}
n.RUnlock()
n.Lock()
defer n.Unlock()
status := nats.CLOSED
if n.conn != nil {
status = n.conn.Status()
@@ -133,7 +135,7 @@ func (n *natsBroker) Connect(ctx context.Context) error {
c, err := opts.Connect()
if err != nil {
if n.opts.Logger.V(logger.WarnLevel) {
n.opts.Logger.Warnf(n.opts.Context, "Error connecting to broker: %v", err)
n.opts.Logger.Warnf(n.opts.Context, "error connecting to broker: %v", err)
}
return err
@@ -145,12 +147,20 @@ func (n *natsBroker) Connect(ctx context.Context) error {
}
func (n *natsBroker) Disconnect(ctx context.Context) error {
n.RLock()
if !n.connected {
n.RUnlock()
return nil
}
n.RUnlock()
n.Lock()
defer n.Unlock()
// drain the connection if specified
if n.drain {
n.conn.Drain()
if err := n.conn.Drain(); err != nil {
return err
}
n.closeCh <- nil
}