Fix rpc go routine leak

This commit is contained in:
Asim Aslam
2019-11-27 17:12:07 +00:00
parent 266b6dbc64
commit af94899b54
6 changed files with 198 additions and 101 deletions

View File

@@ -81,6 +81,20 @@ type router struct {
subscribers map[string][]*subscriber
}
// rpcRouter encapsulates functions that become a server.Router
type rpcRouter struct {
h func(context.Context, Request, interface{}) error
m func(context.Context, Message) error
}
func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error {
return r.m(ctx, msg)
}
func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error {
return r.h(ctx, req, rsp)
}
func newRpcRouter() *router {
return &router{
serviceMap: make(map[string]*service),

View File

@@ -58,19 +58,6 @@ func newRpcServer(opts ...Option) Server {
}
}
type rpcRouter struct {
h func(context.Context, Request, interface{}) error
m func(context.Context, Message) error
}
func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error {
return r.m(ctx, msg)
}
func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error {
return r.h(ctx, req, rsp)
}
// HandleEvent handles inbound messages to the service directly
// TODO: handle requests from an event. We won't send a response.
func (s *rpcServer) HandleEvent(e broker.Event) error {
@@ -141,26 +128,34 @@ func (s *rpcServer) HandleEvent(e broker.Event) error {
// ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) {
var wg sync.WaitGroup
var mtx sync.RWMutex
// global error tracking
var gerr error
// streams are multiplexed on Micro-Stream or Micro-Id header
sockets := make(map[string]*socket.Socket)
pool := socket.NewPool()
// get global waitgroup
s.Lock()
gg := s.wg
s.Unlock()
// waitgroup to wait for processing to finish
wg := &waitGroup{
gg: gg,
}
defer func() {
// wait till done
wg.Wait()
// only wait if there's no error
if gerr == nil {
// wait till done
wg.Wait()
}
// close all the sockets for this connection
pool.Close()
// close underlying socket
sock.Close()
// close the sockets
mtx.Lock()
for id, psock := range sockets {
psock.Close()
delete(sockets, id)
}
mtx.Unlock()
// recover any panics
if r := recover(); r != nil {
log.Log("panic recovered: ", r)
@@ -170,7 +165,12 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
for {
var msg transport.Message
// process inbound messages one at a time
if err := sock.Recv(&msg); err != nil {
// set a global error and return
// we're saying we essentially can't
// use the socket anymore
gerr = err
return
}
@@ -185,9 +185,12 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
msg.Header["Micro-Error"] = err.Error()
}
// write back some 200
sock.Send(&transport.Message{
if err := sock.Send(&transport.Message{
Header: msg.Header,
})
}); err != nil {
gerr = err
break
}
// we're done
continue
}
@@ -205,57 +208,52 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
id = msg.Header["Micro-Id"]
}
// we're starting processing
wg.Add(1)
// check stream id
var stream bool
// add to wait group if "wait" is opt-in
s.Lock()
swg := s.wg
s.Unlock()
if swg != nil {
swg.Add(1)
if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 {
stream = true
}
// check we have an existing socket
mtx.RLock()
psock, ok := sockets[id]
mtx.RUnlock()
// check if we have an existing socket
psock, ok := pool.Get(id)
// got the socket
// if we don't have a socket and its a stream
if !ok && stream {
// check if its a last stream EOS error
err := msg.Header["Micro-Error"]
if err == lastStreamResponseError.Error() {
pool.Release(psock)
continue
}
}
// got an existing socket already
if ok {
// accept the message
// we're starting processing
wg.Add(1)
// pass the message to that existing socket
if err := psock.Accept(&msg); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
}
// done(1)
if swg != nil {
swg.Done()
// release the socket if there's an error
pool.Release(psock)
}
// done waiting
wg.Done()
// continue to the next message
continue
}
// no socket was found
psock = socket.New()
// no socket was found so its new
// set the local and remote values
psock.SetLocal(sock.Local())
psock.SetRemote(sock.Remote())
// load the socket
// load the socket with the current message
psock.Accept(&msg)
// save a new socket
mtx.Lock()
sockets[id] = psock
mtx.Unlock()
// now walk the usual path
// we use this Timeout header to set a server deadline
@@ -292,38 +290,33 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// setup old protocol
cf := setupProtocol(&msg)
// no old codec
// no legacy codec needed
if cf == nil {
// TODO: needs better error handling
var err error
// try get a new codec
if cf, err = s.newCodec(ct); err != nil {
sock.Send(&transport.Message{
// no codec found so send back an error
if err := sock.Send(&transport.Message{
Header: map[string]string{
"Content-Type": "text/plain",
},
Body: []byte(err.Error()),
})
if swg != nil {
swg.Done()
}); err != nil {
gerr = err
}
wg.Done()
return
// release the socket we just created
pool.Release(psock)
// now continue
continue
}
}
// create a new rpc codec based on the pseudo socket and codec
rcodec := newRpcCodec(&msg, psock, cf)
// check the protocol as well
protocol := rcodec.String()
// check stream id
var stream bool
if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 {
stream = true
}
// internal request
request := &rpcRequest{
service: getHeader("Micro-Service", msg.Header),
@@ -363,11 +356,11 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
r = rpcRouter{h: handler}
}
// wait for processing to exit
wg.Add(1)
// process the outbound messages from the socket
go func(id string, psock *socket.Socket) {
// wait for processing to exit
wg.Add(1)
defer func() {
// TODO: don't hack this but if its grpc just break out of the stream
// We do this because the underlying connection is h2 and its a stream
@@ -375,7 +368,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
case "grpc":
sock.Close()
}
// release the socket
pool.Release(psock)
// signal we're done
wg.Done()
}()
@@ -383,10 +378,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// get the message from our internal handler/stream
m := new(transport.Message)
if err := psock.Process(m); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
return
}
@@ -399,7 +390,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// serve the request in a go routine as this may be a stream
go func(id string, psock *socket.Socket) {
defer psock.Close()
// add to the waitgroup
wg.Add(1)
defer func() {
// release the socket
pool.Release(psock)
// signal we're done
wg.Done()
}()
// serve the actual request using the request router
if serveRequestError := r.ServeRequest(ctx, request, response); serveRequestError != nil {
@@ -416,21 +415,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// could not write error response
if writeError != nil && !alreadyClosed {
log.Logf("rpc: unable to write error response: %v", writeError)
log.Debugf("rpc: unable to write error response: %v", writeError)
}
}
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
// signal we're done
if swg != nil {
swg.Done()
}
// done with this socket
wg.Done()
}(id, psock)
}
}

32
server/rpc_util.go Normal file
View File

@@ -0,0 +1,32 @@
package server
import (
"sync"
)
// waitgroup for global management of connections
type waitGroup struct {
// local waitgroup
lg sync.WaitGroup
// global waitgroup
gg *sync.WaitGroup
}
func (w *waitGroup) Add(i int) {
w.lg.Add(i)
if w.gg != nil {
w.gg.Add(i)
}
}
func (w *waitGroup) Done() {
w.lg.Done()
if w.gg != nil {
w.gg.Done()
}
}
func (w *waitGroup) Wait() {
// only wait on local group
w.lg.Wait()
}