client/grpc: fix stream closed bug (#2009)

* client/grpc: fix stream closed bug

* client/grpc: don't use dial context for the stream
This commit is contained in:
ben-toogood 2020-09-17 14:08:21 +01:00 committed by Vasiliy Tolstov
parent 7ef834f0a8
commit 0fda1710d8
3 changed files with 23 additions and 24 deletions

31
grpc.go
View File

@ -172,15 +172,6 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
var dialCtx context.Context
var cancel context.CancelFunc
if opts.DialTimeout >= 0 {
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
} else {
dialCtx, cancel = context.WithCancel(ctx)
}
defer cancel()
wc := wrapCodec{cf} wc := wrapCodec{cf}
grpcDialOptions := []grpc.DialOption{ grpcDialOptions := []grpc.DialOption{
@ -192,7 +183,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcDialOptions = append(grpcDialOptions, opts...) grpcDialOptions = append(grpcDialOptions, opts...)
} }
cc, err := grpc.DialContext(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.getConn(addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -211,16 +202,16 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcCallOptions = append(grpcCallOptions, opts...) grpcCallOptions = append(grpcCallOptions, opts...)
} }
// create a new cancelling context var cancel context.CancelFunc
newCtx, cancel := context.WithCancel(ctx) ctx, cancel = context.WithCancel(ctx)
st, err := cc.NewStream(newCtx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
if err != nil { if err != nil {
// we need to cleanup as we dialled and created a context // we need to cleanup as we dialled and created a context
// cancel the context // cancel the context
cancel() cancel()
// close the connection // release the connection
cc.Close() g.pool.release(addr, cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@ -247,7 +238,15 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
gcodec: codec, gcodec: codec,
}, },
conn: cc, conn: cc,
cancel: cancel, close: func(err error) {
// cancel the context if an error occured
if err != nil {
cancel()
}
// defer execution of release
g.pool.release(addr, cc, err)
},
} }
// set the stream as the response // set the stream as the response

View File

@ -10,7 +10,7 @@ import (
) )
type response struct { type response struct {
conn *grpc.ClientConn conn *poolConn
stream grpc.ClientStream stream grpc.ClientStream
codec encoding.Codec codec encoding.Codec
gcodec codec.Codec gcodec codec.Codec

View File

@ -17,11 +17,11 @@ type grpcStream struct {
sync.RWMutex sync.RWMutex
closed bool closed bool
err error err error
conn *grpc.ClientConn conn *poolConn
request client.Request request client.Request
response client.Response response client.Response
context context.Context context context.Context
cancel func() close func(err error)
} }
func (g *grpcStream) Context() context.Context { func (g *grpcStream) Context() context.Context {
@ -86,9 +86,9 @@ func (g *grpcStream) Close() error {
if g.closed { if g.closed {
return nil return nil
} }
// cancel the context
g.cancel() // close the connection
g.closed = true g.closed = true
g.ClientStream.CloseSend() g.close(g.err)
return g.conn.Close() return g.ClientStream.CloseSend()
} }