From 726793b6fa55e4218c3ab96948e3d8d9ce66fba5 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 May 2017 19:21:41 +0100 Subject: [PATCH 1/5] Wait for requests to finish before closing transport --- server/context.go | 8 ++++++++ server/rpc_server.go | 20 ++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/server/context.go b/server/context.go index 88d19257..c7b08b93 100644 --- a/server/context.go +++ b/server/context.go @@ -6,6 +6,14 @@ import ( type serverKey struct{} +func wait(ctx context.Context) bool { + if ctx == nil { + return false + } + wait, _ := ctx.Value("wait").(bool) + return wait +} + func FromContext(ctx context.Context) (Server, bool) { c, ok := ctx.Value(serverKey{}).(Server) return c, ok diff --git a/server/rpc_server.go b/server/rpc_server.go index 6f97ab9c..48fd97c1 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -30,6 +30,8 @@ type rpcServer struct { subscribers map[*subscriber][]broker.Subscriber // used for first registration registered bool + // graceful exit + wg sync.WaitGroup } func newRpcServer(opts ...Option) Server { @@ -44,6 +46,7 @@ func newRpcServer(opts ...Option) Server { handlers: make(map[string]Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), + wg: sync.WaitGroup{}, } } @@ -100,11 +103,18 @@ func (s *rpcServer) accept(sock transport.Socket) { } } + // add to wait group + s.wg.Add(1) + // TODO: needs better error handling if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { log.Logf("Unexpected error serving request, closing socket: %v", err) + s.wg.Done() return } + + // finish request + s.wg.Done() } } @@ -371,8 +381,18 @@ func (s *rpcServer) Start() error { go ts.Accept(s.accept) go func() { + // wait for exit ch := <-s.exit + + // wait for requests to finish + if wait(s.opts.Context) { + s.wg.Wait() + } + + // close transport listener ch <- ts.Close() + + // disconnect the broker config.Broker.Disconnect() }() From 276a0118da7d7783573d56460c4879eeb5a79d1e Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 May 2017 19:33:11 +0100 Subject: [PATCH 2/5] be pedantic --- server/context.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/context.go b/server/context.go index c7b08b93..627caa54 100644 --- a/server/context.go +++ b/server/context.go @@ -10,7 +10,10 @@ func wait(ctx context.Context) bool { if ctx == nil { return false } - wait, _ := ctx.Value("wait").(bool) + wait, ok := ctx.Value("wait").(bool) + if !ok { + return false + } return wait } From 2ee22f53367adf7d4cb8c7e9658d30c9ed38a825 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 May 2017 19:35:16 +0100 Subject: [PATCH 3/5] strip init of wg --- server/rpc_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/rpc_server.go b/server/rpc_server.go index 48fd97c1..6b6af066 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -46,7 +46,6 @@ func newRpcServer(opts ...Option) Server { handlers: make(map[string]Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error), - wg: sync.WaitGroup{}, } } From f7a9207c7acf342c66322cae104e4887f3b38804 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 May 2017 19:47:41 +0100 Subject: [PATCH 4/5] wait for subscribers to complete --- server/subscriber.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/server/subscriber.go b/server/subscriber.go index 3c30a40d..30594348 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -226,11 +226,15 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle fn = opts.SubWrappers[i-1](fn) } - go fn(ctx, &rpcPublication{ - topic: sb.topic, - contentType: ct, - message: req.Interface(), - }) + s.wg.Add(1) + go func() { + fn(ctx, &rpcPublication{ + topic: sb.topic, + contentType: ct, + message: req.Interface(), + }) + s.wg.Done() + }() } return nil } From e1a1f1c0f0b9d879f368ce1da289443831d0a4d0 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 May 2017 19:47:50 +0100 Subject: [PATCH 5/5] add wait option --- server/options.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/options.go b/server/options.go index edaeceea..c4602498 100644 --- a/server/options.go +++ b/server/options.go @@ -165,6 +165,16 @@ func RegisterTTL(t time.Duration) Option { } } +// Wait tells the server to wait for requests to finish before exiting +func Wait(b bool) Option { + return func(o *Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, "wait", b) + } +} + // Adds a handler Wrapper to a list of options passed into the server func WrapHandler(w HandlerWrapper) Option { return func(o *Options) {