diff --git a/broker/default.go b/broker/default.go index dd4a95ab..9228daad 100644 --- a/broker/default.go +++ b/broker/default.go @@ -295,6 +295,10 @@ func (n *natsBroker) Connect() error { return nil default: // DISCONNECTED or CLOSED or DRAINING opts := n.nopts + opts.DrainTimeout = 1 * time.Second + opts.AsyncErrorCB = n.onAsyncError + opts.DisconnectedErrCB = n.onDisconnectedError + opts.ClosedCB = n.onClose opts.Servers = n.servers opts.Secure = n.opts.Secure opts.TLSConfig = n.opts.TLSConfig @@ -324,7 +328,7 @@ func (n *natsBroker) Disconnect() error { // drain the connection if specified if n.drain { n.conn.Drain() - return <-n.closeCh + n.closeCh <- nil } // close the client connection @@ -434,6 +438,10 @@ func (n *natsBroker) onClose(conn *nats.Conn) { n.closeCh <- nil } +func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) { + n.closeCh <- nil +} + func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) { // There are kinds of different async error nats might callback, but we are interested // in ErrDrainTimeout only here. diff --git a/broker/nats/nats.go b/broker/nats/nats.go index f4e00fce..680657d9 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -318,7 +318,7 @@ func (n *natsBroker) Disconnect() error { // drain the connection if specified if n.drain { n.conn.Drain() - return <-n.closeCh + n.closeCh <- nil } // close the client connection @@ -440,6 +440,7 @@ func (n *natsBroker) setOption(opts ...broker.Option) { n.closeCh = make(chan error) n.nopts.ClosedCB = n.onClose n.nopts.AsyncErrorCB = n.onAsyncError + n.nopts.DisconnectedErrCB = n.onDisconnectedError } } @@ -455,6 +456,10 @@ func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err e } } +func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) { + n.closeCh <- nil +} + func NewBroker(opts ...broker.Option) broker.Broker { options := broker.Options{ // Default codec