cancel stream

This commit is contained in:
Asim Aslam 2020-01-19 22:53:56 +00:00 committed by Vasiliy Tolstov
parent 5422d368c0
commit b65757048a
2 changed files with 8 additions and 2 deletions

View File

@ -212,7 +212,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
grpcCallOptions = append(grpcCallOptions, opts...) grpcCallOptions = append(grpcCallOptions, opts...)
} }
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) // create a new cancelling context
newCtx, cancel := context.WithCancel(ctx)
st, err := cc.NewStream(newCtx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@ -240,6 +243,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
response: rsp, response: rsp,
stream: st, stream: st,
conn: cc, conn: cc,
cancel: cancel,
}, nil }, nil
} }

View File

@ -19,6 +19,7 @@ type grpcStream struct {
request client.Request request client.Request
response client.Response response client.Response
context context.Context context context.Context
cancel func()
} }
func (g *grpcStream) Context() context.Context { func (g *grpcStream) Context() context.Context {
@ -79,7 +80,8 @@ func (g *grpcStream) Close() error {
if g.closed { if g.closed {
return nil return nil
} }
// cancel the context
g.cancel()
g.closed = true g.closed = true
g.stream.CloseSend() g.stream.CloseSend()
return g.conn.Close() return g.conn.Close()