diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index bb968d94..86025a9d 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -53,6 +53,8 @@ type grpcServer struct { opts server.Options handlers map[string]server.Handler subscribers map[*subscriber][]broker.Subscriber + // marks the serve as started + started bool // used for first registration registered bool } @@ -454,7 +456,10 @@ func (g *grpcServer) newCodec(contentType string) (codec.NewCodec, error) { } func (g *grpcServer) Options() server.Options { + g.RLock() opts := g.opts + g.RUnlock() + return opts } @@ -700,7 +705,14 @@ func (g *grpcServer) Deregister() error { } func (g *grpcServer) Start() error { - config := g.opts + g.RLock() + if g.started { + g.RUnlock() + return nil + } + g.RUnlock() + + config := g.Options() // micro: config.Transport.Listen(config.Address) ts, err := net.Listen("tcp", config.Address) @@ -781,13 +793,32 @@ func (g *grpcServer) Start() error { config.Broker.Disconnect() }() + // mark the server as started + g.Lock() + g.started = true + g.Unlock() + return nil } func (g *grpcServer) Stop() error { + g.RLock() + if !g.started { + g.RUnlock() + return nil + } + g.RUnlock() + ch := make(chan error) g.exit <- ch - return <-ch + + var err error + select { + case err = <-ch: + g.started = false + } + + return err } func (g *grpcServer) String() string { diff --git a/server/rpc_server.go b/server/rpc_server.go index f7ce74ec..4b825a8b 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -30,6 +30,8 @@ type rpcServer struct { opts Options handlers map[string]Handler subscribers map[*subscriber][]broker.Subscriber + // marks the serve as started + started bool // used for first registration registered bool // graceful exit @@ -584,6 +586,13 @@ func (s *rpcServer) Deregister() error { } func (s *rpcServer) Start() error { + s.RLock() + if s.started { + s.RUnlock() + return nil + } + s.RUnlock() + config := s.Options() // start listening on the transport @@ -708,13 +717,32 @@ func (s *rpcServer) Start() error { s.Unlock() }() + // mark the server as started + s.Lock() + s.started = true + s.Unlock() + return nil } func (s *rpcServer) Stop() error { + s.RLock() + if !s.started { + s.RUnlock() + return nil + } + s.RUnlock() + ch := make(chan error) s.exit <- ch - return <-ch + + var err error + select { + case err = <-ch: + s.started = false + } + + return err } func (s *rpcServer) String() string {