diff --git a/grpc.go b/grpc.go index f00a12b..1d4eea0 100644 --- a/grpc.go +++ b/grpc.go @@ -212,7 +212,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client grpcCallOptions = append(grpcCallOptions, opts...) } - st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) + // create a new cancelling context + newCtx, cancel := context.WithCancel(ctx) + + st, err := cc.NewStream(newCtx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } @@ -240,6 +243,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client response: rsp, stream: st, conn: cc, + cancel: cancel, }, nil } diff --git a/stream.go b/stream.go index ea9da43..e5ebd15 100644 --- a/stream.go +++ b/stream.go @@ -19,6 +19,7 @@ type grpcStream struct { request client.Request response client.Response context context.Context + cancel func() } func (g *grpcStream) Context() context.Context { @@ -79,7 +80,8 @@ func (g *grpcStream) Close() error { if g.closed { return nil } - + // cancel the context + g.cancel() g.closed = true g.stream.CloseSend() return g.conn.Close()