From 79ad1e6fe3998e65b26e8afbb0e35ae3fdb74b76 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 11 Feb 2020 21:41:23 +0300 Subject: [PATCH] various fixes for broker and messaging in server (#1187) * provide broker disconnect messages in server Signed-off-by: Vasiliy Tolstov * broker/eats: another fix Signed-off-by: Vasiliy Tolstov --- broker/default.go | 24 ++++++++++++++++-------- broker/nats/nats.go | 2 +- server/grpc/grpc.go | 6 ++---- server/rpc_server.go | 1 + 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/broker/default.go b/broker/default.go index 9228daad..a71a5d17 100644 --- a/broker/default.go +++ b/broker/default.go @@ -240,6 +240,10 @@ func (n *natsBroker) serve(exit chan bool) error { // register the cluster address for { select { + case err := <-n.closeCh: + if err != nil { + log.Log(err) + } case <-exit: // deregister on exit n.opts.Registry.Deregister(®istry.Service{ @@ -282,7 +286,6 @@ func (n *natsBroker) Connect() error { } // set to connected - n.connected = true } status := nats.CLOSED @@ -313,6 +316,9 @@ func (n *natsBroker) Connect() error { return err } n.conn = c + + n.connected = true + return nil } } @@ -328,7 +334,6 @@ func (n *natsBroker) Disconnect() error { // drain the connection if specified if n.drain { n.conn.Drain() - n.closeCh <- nil } // close the client connection @@ -336,10 +341,12 @@ func (n *natsBroker) Disconnect() error { // shutdown the local server // and deregister - select { - case <-n.exit: - default: - close(n.exit) + if n.server != nil { + select { + case <-n.exit: + default: + close(n.exit) + } } // set not connected @@ -439,7 +446,7 @@ func (n *natsBroker) onClose(conn *nats.Conn) { } func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) { - n.closeCh <- nil + n.closeCh <- err } func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) { @@ -459,7 +466,8 @@ func NewBroker(opts ...Option) Broker { } n := &natsBroker{ - opts: options, + opts: options, + closeCh: make(chan error), } n.setOption(opts...) diff --git a/broker/nats/nats.go b/broker/nats/nats.go index 680657d9..2cce84e4 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -457,7 +457,7 @@ func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err e } func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) { - n.closeCh <- nil + n.closeCh <- err } func NewBroker(opts ...broker.Option) broker.Broker { diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index dd2daeff..5aed508e 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -805,10 +805,7 @@ func (g *grpcServer) Start() error { return err } - baddr := config.Broker.Address() - bname := config.Broker.String() - - log.Logf("Broker [%s] Connected to %s", bname, baddr) + log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } // announce self to the world @@ -876,6 +873,7 @@ func (g *grpcServer) Start() error { // close transport ch <- nil + log.Logf("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) // disconnect broker config.Broker.Disconnect() }() diff --git a/server/rpc_server.go b/server/rpc_server.go index 7eba3806..6c0c793d 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -879,6 +879,7 @@ func (s *rpcServer) Start() error { // close transport listener ch <- ts.Close() + log.Logf("Broker [%s] Disconnected from %s", bname, config.Broker.Address()) // disconnect the broker config.Broker.Disconnect()