From af94899b540718181a1058f48b0bfaf48f8941b2 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 27 Nov 2019 17:12:07 +0000 Subject: [PATCH] Fix rpc go routine leak --- server/rpc_router.go | 14 +++ server/rpc_server.go | 185 +++++++++++++++++------------------- server/rpc_util.go | 32 +++++++ transport/http_transport.go | 8 +- util/socket/pool.go | 56 +++++++++++ util/socket/socket.go | 4 +- 6 files changed, 198 insertions(+), 101 deletions(-) create mode 100644 server/rpc_util.go create mode 100644 util/socket/pool.go diff --git a/server/rpc_router.go b/server/rpc_router.go index 493f2af7..fddce6fd 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -81,6 +81,20 @@ type router struct { subscribers map[string][]*subscriber } +// rpcRouter encapsulates functions that become a server.Router +type rpcRouter struct { + h func(context.Context, Request, interface{}) error + m func(context.Context, Message) error +} + +func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error { + return r.m(ctx, msg) +} + +func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error { + return r.h(ctx, req, rsp) +} + func newRpcRouter() *router { return &router{ serviceMap: make(map[string]*service), diff --git a/server/rpc_server.go b/server/rpc_server.go index 607ead53..f1c3e713 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -58,19 +58,6 @@ func newRpcServer(opts ...Option) Server { } } -type rpcRouter struct { - h func(context.Context, Request, interface{}) error - m func(context.Context, Message) error -} - -func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error { - return r.m(ctx, msg) -} - -func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error { - return r.h(ctx, req, rsp) -} - // HandleEvent handles inbound messages to the service directly // TODO: handle requests from an event. We won't send a response. func (s *rpcServer) HandleEvent(e broker.Event) error { @@ -141,26 +128,34 @@ func (s *rpcServer) HandleEvent(e broker.Event) error { // ServeConn serves a single connection func (s *rpcServer) ServeConn(sock transport.Socket) { - var wg sync.WaitGroup - var mtx sync.RWMutex + // global error tracking + var gerr error // streams are multiplexed on Micro-Stream or Micro-Id header - sockets := make(map[string]*socket.Socket) + pool := socket.NewPool() + + // get global waitgroup + s.Lock() + gg := s.wg + s.Unlock() + + // waitgroup to wait for processing to finish + wg := &waitGroup{ + gg: gg, + } defer func() { - // wait till done - wg.Wait() + // only wait if there's no error + if gerr == nil { + // wait till done + wg.Wait() + } + + // close all the sockets for this connection + pool.Close() // close underlying socket sock.Close() - // close the sockets - mtx.Lock() - for id, psock := range sockets { - psock.Close() - delete(sockets, id) - } - mtx.Unlock() - // recover any panics if r := recover(); r != nil { log.Log("panic recovered: ", r) @@ -170,7 +165,12 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { for { var msg transport.Message + // process inbound messages one at a time if err := sock.Recv(&msg); err != nil { + // set a global error and return + // we're saying we essentially can't + // use the socket anymore + gerr = err return } @@ -185,9 +185,12 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { msg.Header["Micro-Error"] = err.Error() } // write back some 200 - sock.Send(&transport.Message{ + if err := sock.Send(&transport.Message{ Header: msg.Header, - }) + }); err != nil { + gerr = err + break + } // we're done continue } @@ -205,57 +208,52 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { id = msg.Header["Micro-Id"] } - // we're starting processing - wg.Add(1) + // check stream id + var stream bool - // add to wait group if "wait" is opt-in - s.Lock() - swg := s.wg - s.Unlock() - - if swg != nil { - swg.Add(1) + if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { + stream = true } - // check we have an existing socket - mtx.RLock() - psock, ok := sockets[id] - mtx.RUnlock() + // check if we have an existing socket + psock, ok := pool.Get(id) - // got the socket + // if we don't have a socket and its a stream + if !ok && stream { + // check if its a last stream EOS error + err := msg.Header["Micro-Error"] + if err == lastStreamResponseError.Error() { + pool.Release(psock) + continue + } + } + + // got an existing socket already if ok { - // accept the message + // we're starting processing + wg.Add(1) + + // pass the message to that existing socket if err := psock.Accept(&msg); err != nil { - // delete the socket - mtx.Lock() - delete(sockets, id) - mtx.Unlock() - } - - // done(1) - if swg != nil { - swg.Done() + // release the socket if there's an error + pool.Release(psock) } + // done waiting wg.Done() // continue to the next message continue } - // no socket was found - psock = socket.New() + // no socket was found so its new + // set the local and remote values psock.SetLocal(sock.Local()) psock.SetRemote(sock.Remote()) - // load the socket + // load the socket with the current message psock.Accept(&msg) - // save a new socket - mtx.Lock() - sockets[id] = psock - mtx.Unlock() - // now walk the usual path // we use this Timeout header to set a server deadline @@ -292,38 +290,33 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // setup old protocol cf := setupProtocol(&msg) - // no old codec + // no legacy codec needed if cf == nil { - // TODO: needs better error handling var err error + // try get a new codec if cf, err = s.newCodec(ct); err != nil { - sock.Send(&transport.Message{ + // no codec found so send back an error + if err := sock.Send(&transport.Message{ Header: map[string]string{ "Content-Type": "text/plain", }, Body: []byte(err.Error()), - }) - - if swg != nil { - swg.Done() + }); err != nil { + gerr = err } - wg.Done() - - return + // release the socket we just created + pool.Release(psock) + // now continue + continue } } + // create a new rpc codec based on the pseudo socket and codec rcodec := newRpcCodec(&msg, psock, cf) + // check the protocol as well protocol := rcodec.String() - // check stream id - var stream bool - - if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { - stream = true - } - // internal request request := &rpcRequest{ service: getHeader("Micro-Service", msg.Header), @@ -363,11 +356,11 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { r = rpcRouter{h: handler} } - // wait for processing to exit - wg.Add(1) - // process the outbound messages from the socket go func(id string, psock *socket.Socket) { + // wait for processing to exit + wg.Add(1) + defer func() { // TODO: don't hack this but if its grpc just break out of the stream // We do this because the underlying connection is h2 and its a stream @@ -375,7 +368,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { case "grpc": sock.Close() } - + // release the socket + pool.Release(psock) + // signal we're done wg.Done() }() @@ -383,10 +378,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // get the message from our internal handler/stream m := new(transport.Message) if err := psock.Process(m); err != nil { - // delete the socket - mtx.Lock() - delete(sockets, id) - mtx.Unlock() return } @@ -399,7 +390,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // serve the request in a go routine as this may be a stream go func(id string, psock *socket.Socket) { - defer psock.Close() + // add to the waitgroup + wg.Add(1) + + defer func() { + // release the socket + pool.Release(psock) + // signal we're done + wg.Done() + }() // serve the actual request using the request router if serveRequestError := r.ServeRequest(ctx, request, response); serveRequestError != nil { @@ -416,21 +415,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // could not write error response if writeError != nil && !alreadyClosed { - log.Logf("rpc: unable to write error response: %v", writeError) + log.Debugf("rpc: unable to write error response: %v", writeError) } } - - mtx.Lock() - delete(sockets, id) - mtx.Unlock() - - // signal we're done - if swg != nil { - swg.Done() - } - - // done with this socket - wg.Done() }(id, psock) } } diff --git a/server/rpc_util.go b/server/rpc_util.go new file mode 100644 index 00000000..a15be0ac --- /dev/null +++ b/server/rpc_util.go @@ -0,0 +1,32 @@ +package server + +import ( + "sync" +) + +// waitgroup for global management of connections +type waitGroup struct { + // local waitgroup + lg sync.WaitGroup + // global waitgroup + gg *sync.WaitGroup +} + +func (w *waitGroup) Add(i int) { + w.lg.Add(i) + if w.gg != nil { + w.gg.Add(i) + } +} + +func (w *waitGroup) Done() { + w.lg.Done() + if w.gg != nil { + w.gg.Done() + } +} + +func (w *waitGroup) Wait() { + // only wait on local group + w.lg.Wait() +} diff --git a/transport/http_transport.go b/transport/http_transport.go index c9878073..1ce109aa 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -285,8 +285,14 @@ func (h *httpTransportSocket) Recv(m *Message) error { func (h *httpTransportSocket) Send(m *Message) error { if h.r.ProtoMajor == 1 { + // make copy of header + hdr := make(http.Header) + for k, v := range h.r.Header { + hdr[k] = v + } + rsp := &http.Response{ - Header: h.r.Header, + Header: hdr, Body: ioutil.NopCloser(bytes.NewReader(m.Body)), Status: "200 OK", StatusCode: 200, diff --git a/util/socket/pool.go b/util/socket/pool.go new file mode 100644 index 00000000..9439f131 --- /dev/null +++ b/util/socket/pool.go @@ -0,0 +1,56 @@ +package socket + +import ( + "sync" +) + +type Pool struct { + sync.RWMutex + pool map[string]*Socket +} + +func (p *Pool) Get(id string) (*Socket, bool) { + // attempt to get existing socket + p.RLock() + socket, ok := p.pool[id] + if ok { + p.RUnlock() + return socket, ok + } + p.RUnlock() + + // create new socket + socket = New(id) + // save socket + p.Lock() + p.pool[id] = socket + p.Unlock() + // return socket + return socket, false +} + +func (p *Pool) Release(s *Socket) { + p.Lock() + defer p.Unlock() + + // close the socket + s.Close() + delete(p.pool, s.id) +} + +// Close the pool and delete all the sockets +func (p *Pool) Close() { + p.Lock() + defer p.Unlock() + for id, sock := range p.pool { + sock.Close() + delete(p.pool, id) + } +} + +// NewPool returns a new socket pool +func NewPool() *Pool { + return &Pool{ + pool: make(map[string]*Socket), + } +} diff --git a/util/socket/socket.go b/util/socket/socket.go index 29ba5006..f536ad7d 100644 --- a/util/socket/socket.go +++ b/util/socket/socket.go @@ -9,6 +9,7 @@ import ( // Socket is our pseudo socket for transport.Socket type Socket struct { + id string // closed closed chan bool // remote addr @@ -119,8 +120,9 @@ func (s *Socket) Close() error { // New returns a new pseudo socket which can be used in the place of a transport socket. // Messages are sent to the socket via Accept and receives from the socket via Process. // SetLocal/SetRemote should be called before using the socket. -func New() *Socket { +func New(id string) *Socket { return &Socket{ + id: id, closed: make(chan bool), local: "local", remote: "remote",