Merge pull request #171 from micro/wait
Wait for requests to finish before closing transport
This commit is contained in:
commit
334efcf212
@ -6,6 +6,17 @@ import (
|
|||||||
|
|
||||||
type serverKey struct{}
|
type serverKey struct{}
|
||||||
|
|
||||||
|
func wait(ctx context.Context) bool {
|
||||||
|
if ctx == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
wait, ok := ctx.Value("wait").(bool)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return wait
|
||||||
|
}
|
||||||
|
|
||||||
func FromContext(ctx context.Context) (Server, bool) {
|
func FromContext(ctx context.Context) (Server, bool) {
|
||||||
c, ok := ctx.Value(serverKey{}).(Server)
|
c, ok := ctx.Value(serverKey{}).(Server)
|
||||||
return c, ok
|
return c, ok
|
||||||
|
@ -165,6 +165,16 @@ func RegisterTTL(t time.Duration) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait tells the server to wait for requests to finish before exiting
|
||||||
|
func Wait(b bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, "wait", b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Adds a handler Wrapper to a list of options passed into the server
|
// Adds a handler Wrapper to a list of options passed into the server
|
||||||
func WrapHandler(w HandlerWrapper) Option {
|
func WrapHandler(w HandlerWrapper) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@ -30,6 +30,8 @@ type rpcServer struct {
|
|||||||
subscribers map[*subscriber][]broker.Subscriber
|
subscribers map[*subscriber][]broker.Subscriber
|
||||||
// used for first registration
|
// used for first registration
|
||||||
registered bool
|
registered bool
|
||||||
|
// graceful exit
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRpcServer(opts ...Option) Server {
|
func newRpcServer(opts ...Option) Server {
|
||||||
@ -100,11 +102,18 @@ func (s *rpcServer) accept(sock transport.Socket) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add to wait group
|
||||||
|
s.wg.Add(1)
|
||||||
|
|
||||||
// TODO: needs better error handling
|
// TODO: needs better error handling
|
||||||
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
|
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
|
||||||
log.Logf("Unexpected error serving request, closing socket: %v", err)
|
log.Logf("Unexpected error serving request, closing socket: %v", err)
|
||||||
|
s.wg.Done()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// finish request
|
||||||
|
s.wg.Done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -371,8 +380,18 @@ func (s *rpcServer) Start() error {
|
|||||||
go ts.Accept(s.accept)
|
go ts.Accept(s.accept)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
// wait for exit
|
||||||
ch := <-s.exit
|
ch := <-s.exit
|
||||||
|
|
||||||
|
// wait for requests to finish
|
||||||
|
if wait(s.opts.Context) {
|
||||||
|
s.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// close transport listener
|
||||||
ch <- ts.Close()
|
ch <- ts.Close()
|
||||||
|
|
||||||
|
// disconnect the broker
|
||||||
config.Broker.Disconnect()
|
config.Broker.Disconnect()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -226,11 +226,15 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
|
|||||||
fn = opts.SubWrappers[i-1](fn)
|
fn = opts.SubWrappers[i-1](fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
go fn(ctx, &rpcPublication{
|
s.wg.Add(1)
|
||||||
topic: sb.topic,
|
go func() {
|
||||||
contentType: ct,
|
fn(ctx, &rpcPublication{
|
||||||
message: req.Interface(),
|
topic: sb.topic,
|
||||||
})
|
contentType: ct,
|
||||||
|
message: req.Interface(),
|
||||||
|
})
|
||||||
|
s.wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user