From 0fda1710d869f6d811a9a911f97519000c2e4239 Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Thu, 17 Sep 2020 14:08:21 +0100 Subject: [PATCH] client/grpc: fix stream closed bug (#2009) * client/grpc: fix stream closed bug * client/grpc: don't use dial context for the stream --- grpc.go | 33 ++++++++++++++++----------------- response.go | 2 +- stream.go | 12 ++++++------ 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/grpc.go b/grpc.go index f230fa4..838bead 100644 --- a/grpc.go +++ b/grpc.go @@ -172,15 +172,6 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request return errors.InternalServerError("go.micro.client", err.Error()) } - var dialCtx context.Context - var cancel context.CancelFunc - if opts.DialTimeout >= 0 { - dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout) - } else { - dialCtx, cancel = context.WithCancel(ctx) - } - defer cancel() - wc := wrapCodec{cf} grpcDialOptions := []grpc.DialOption{ @@ -192,7 +183,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request grpcDialOptions = append(grpcDialOptions, opts...) } - cc, err := grpc.DialContext(dialCtx, addr, grpcDialOptions...) + cc, err := g.pool.getConn(addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -211,16 +202,16 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request grpcCallOptions = append(grpcCallOptions, opts...) } - // create a new cancelling context - newCtx, cancel := context.WithCancel(ctx) + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) - st, err := cc.NewStream(newCtx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) + st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) if err != nil { // we need to cleanup as we dialled and created a context // cancel the context cancel() - // close the connection - cc.Close() + // release the connection + g.pool.release(addr, cc, err) // now return the error return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } @@ -246,8 +237,16 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request codec: cf, gcodec: codec, }, - conn: cc, - cancel: cancel, + conn: cc, + close: func(err error) { + // cancel the context if an error occured + if err != nil { + cancel() + } + + // defer execution of release + g.pool.release(addr, cc, err) + }, } // set the stream as the response diff --git a/response.go b/response.go index 1a871c9..a506e8a 100644 --- a/response.go +++ b/response.go @@ -10,7 +10,7 @@ import ( ) type response struct { - conn *grpc.ClientConn + conn *poolConn stream grpc.ClientStream codec encoding.Codec gcodec codec.Codec diff --git a/stream.go b/stream.go index d27493b..4c7e38f 100644 --- a/stream.go +++ b/stream.go @@ -17,11 +17,11 @@ type grpcStream struct { sync.RWMutex closed bool err error - conn *grpc.ClientConn + conn *poolConn request client.Request response client.Response context context.Context - cancel func() + close func(err error) } func (g *grpcStream) Context() context.Context { @@ -86,9 +86,9 @@ func (g *grpcStream) Close() error { if g.closed { return nil } - // cancel the context - g.cancel() + + // close the connection g.closed = true - g.ClientStream.CloseSend() - return g.conn.Close() + g.close(g.err) + return g.ClientStream.CloseSend() }