diff --git a/server/rpc_server.go b/server/rpc_server.go index 6b6af066..eeeb8ccd 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -104,16 +104,13 @@ func (s *rpcServer) accept(sock transport.Socket) { // add to wait group s.wg.Add(1) + defer s.wg.Done() // TODO: needs better error handling if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { log.Logf("Unexpected error serving request, closing socket: %v", err) - s.wg.Done() return } - - // finish request - s.wg.Done() } } diff --git a/server/subscriber.go b/server/subscriber.go index 30594348..f11f4356 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -228,12 +228,12 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle s.wg.Add(1) go func() { + defer s.wg.Done() fn(ctx, &rpcPublication{ topic: sb.topic, contentType: ct, message: req.Interface(), }) - s.wg.Done() }() } return nil