From 80dc0b97a94c7ad945fe323f8fc4418562b2e2c2 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 23 Aug 2019 15:00:29 +0100 Subject: [PATCH 1/2] Make server starts and stops idempotent --- server/grpc/grpc.go | 35 +++++++++++++++++++++++++++++++++-- server/rpc_server.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index bb968d94..86025a9d 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -53,6 +53,8 @@ type grpcServer struct { opts server.Options handlers map[string]server.Handler subscribers map[*subscriber][]broker.Subscriber + // marks the serve as started + started bool // used for first registration registered bool } @@ -454,7 +456,10 @@ func (g *grpcServer) newCodec(contentType string) (codec.NewCodec, error) { } func (g *grpcServer) Options() server.Options { + g.RLock() opts := g.opts + g.RUnlock() + return opts } @@ -700,7 +705,14 @@ func (g *grpcServer) Deregister() error { } func (g *grpcServer) Start() error { - config := g.opts + g.RLock() + if g.started { + g.RUnlock() + return nil + } + g.RUnlock() + + config := g.Options() // micro: config.Transport.Listen(config.Address) ts, err := net.Listen("tcp", config.Address) @@ -781,13 +793,32 @@ func (g *grpcServer) Start() error { config.Broker.Disconnect() }() + // mark the server as started + g.Lock() + g.started = true + g.Unlock() + return nil } func (g *grpcServer) Stop() error { + g.RLock() + if !g.started { + g.RUnlock() + return nil + } + g.RUnlock() + ch := make(chan error) g.exit <- ch - return <-ch + + var err error + select { + case err = <-ch: + g.started = false + } + + return err } func (g *grpcServer) String() string { diff --git a/server/rpc_server.go b/server/rpc_server.go index f7ce74ec..4b825a8b 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -30,6 +30,8 @@ type rpcServer struct { opts Options handlers map[string]Handler subscribers map[*subscriber][]broker.Subscriber + // marks the serve as started + started bool // used for first registration registered bool // graceful exit @@ -584,6 +586,13 @@ func (s *rpcServer) Deregister() error { } func (s *rpcServer) Start() error { + s.RLock() + if s.started { + s.RUnlock() + return nil + } + s.RUnlock() + config := s.Options() // start listening on the transport @@ -708,13 +717,32 @@ func (s *rpcServer) Start() error { s.Unlock() }() + // mark the server as started + s.Lock() + s.started = true + s.Unlock() + return nil } func (s *rpcServer) Stop() error { + s.RLock() + if !s.started { + s.RUnlock() + return nil + } + s.RUnlock() + ch := make(chan error) s.exit <- ch - return <-ch + + var err error + select { + case err = <-ch: + s.started = false + } + + return err } func (s *rpcServer) String() string { From ba99f037fbffd18af0077862beae9cb271122dfa Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 23 Aug 2019 15:07:08 +0100 Subject: [PATCH 2/2] Lock started flag when changing it. --- server/grpc/grpc.go | 2 ++ server/rpc_server.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 86025a9d..4dcd6d73 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -815,7 +815,9 @@ func (g *grpcServer) Stop() error { var err error select { case err = <-ch: + g.Lock() g.started = false + g.Unlock() } return err diff --git a/server/rpc_server.go b/server/rpc_server.go index 4b825a8b..c02b893a 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -739,7 +739,9 @@ func (s *rpcServer) Stop() error { var err error select { case err = <-ch: + s.Lock() s.started = false + s.Unlock() } return err