diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 3d158ea0..f00a12b0 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -211,6 +211,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client if opts := g.getGrpcCallOptions(); opts != nil { grpcCallOptions = append(grpcCallOptions, opts...) } + st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) diff --git a/client/grpc/stream.go b/client/grpc/stream.go index c6b38c15..ea9da43f 100644 --- a/client/grpc/stream.go +++ b/client/grpc/stream.go @@ -12,6 +12,7 @@ import ( // Implements the streamer interface type grpcStream struct { sync.RWMutex + closed bool err error conn *grpc.ClientConn stream grpc.ClientStream @@ -46,7 +47,7 @@ func (g *grpcStream) Recv(msg interface{}) (err error) { // #202 - inconsistent gRPC stream behavior // the only way to tell if the stream is done is when we get a EOF on the Recv // here we should close the underlying gRPC ClientConn - closeErr := g.conn.Close() + closeErr := g.Close() if err == io.EOF && closeErr != nil { err = closeErr } @@ -72,5 +73,14 @@ func (g *grpcStream) setError(e error) { // stream should still be able to receive after this function call // TODO: should the conn be closed in another way? func (g *grpcStream) Close() error { - return g.stream.CloseSend() + g.Lock() + defer g.Unlock() + + if g.closed { + return nil + } + + g.closed = true + g.stream.CloseSend() + return g.conn.Close() }