Merge pull request #696 from milosgajdos83/server-idempotent

Make server Start() and Stop() idempotent
This commit is contained in:
Asim Aslam 2019-08-23 15:12:33 +01:00 committed by GitHub
commit 718780367e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 3 deletions

View File

@ -53,6 +53,8 @@ type grpcServer struct {
opts server.Options opts server.Options
handlers map[string]server.Handler handlers map[string]server.Handler
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
// marks the serve as started
started bool
// used for first registration // used for first registration
registered bool registered bool
} }
@ -454,7 +456,10 @@ func (g *grpcServer) newCodec(contentType string) (codec.NewCodec, error) {
} }
func (g *grpcServer) Options() server.Options { func (g *grpcServer) Options() server.Options {
g.RLock()
opts := g.opts opts := g.opts
g.RUnlock()
return opts return opts
} }
@ -700,7 +705,14 @@ func (g *grpcServer) Deregister() error {
} }
func (g *grpcServer) Start() error { func (g *grpcServer) Start() error {
config := g.opts g.RLock()
if g.started {
g.RUnlock()
return nil
}
g.RUnlock()
config := g.Options()
// micro: config.Transport.Listen(config.Address) // micro: config.Transport.Listen(config.Address)
ts, err := net.Listen("tcp", config.Address) ts, err := net.Listen("tcp", config.Address)
@ -781,13 +793,34 @@ func (g *grpcServer) Start() error {
config.Broker.Disconnect() config.Broker.Disconnect()
}() }()
// mark the server as started
g.Lock()
g.started = true
g.Unlock()
return nil return nil
} }
func (g *grpcServer) Stop() error { func (g *grpcServer) Stop() error {
g.RLock()
if !g.started {
g.RUnlock()
return nil
}
g.RUnlock()
ch := make(chan error) ch := make(chan error)
g.exit <- ch g.exit <- ch
return <-ch
var err error
select {
case err = <-ch:
g.Lock()
g.started = false
g.Unlock()
}
return err
} }
func (g *grpcServer) String() string { func (g *grpcServer) String() string {

View File

@ -30,6 +30,8 @@ type rpcServer struct {
opts Options opts Options
handlers map[string]Handler handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
// marks the serve as started
started bool
// used for first registration // used for first registration
registered bool registered bool
// graceful exit // graceful exit
@ -584,6 +586,13 @@ func (s *rpcServer) Deregister() error {
} }
func (s *rpcServer) Start() error { func (s *rpcServer) Start() error {
s.RLock()
if s.started {
s.RUnlock()
return nil
}
s.RUnlock()
config := s.Options() config := s.Options()
// start listening on the transport // start listening on the transport
@ -708,13 +717,34 @@ func (s *rpcServer) Start() error {
s.Unlock() s.Unlock()
}() }()
// mark the server as started
s.Lock()
s.started = true
s.Unlock()
return nil return nil
} }
func (s *rpcServer) Stop() error { func (s *rpcServer) Stop() error {
s.RLock()
if !s.started {
s.RUnlock()
return nil
}
s.RUnlock()
ch := make(chan error) ch := make(chan error)
s.exit <- ch s.exit <- ch
return <-ch
var err error
select {
case err = <-ch:
s.Lock()
s.started = false
s.Unlock()
}
return err
} }
func (s *rpcServer) String() string { func (s *rpcServer) String() string {