ensure we close the grpc stream (#1098)

* ensure we close the grpc stream

* use g.Close

* use closed bool flag for checking connection close
This commit is contained in:
Asim Aslam 2020-01-09 17:00:14 +00:00 committed by GitHub
parent a90a74c9e2
commit 37d1139a57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 2 deletions

View File

@ -211,6 +211,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
if opts := g.getGrpcCallOptions(); opts != nil { if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...) grpcCallOptions = append(grpcCallOptions, opts...)
} }
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))

View File

@ -12,6 +12,7 @@ import (
// Implements the streamer interface // Implements the streamer interface
type grpcStream struct { type grpcStream struct {
sync.RWMutex sync.RWMutex
closed bool
err error err error
conn *grpc.ClientConn conn *grpc.ClientConn
stream grpc.ClientStream stream grpc.ClientStream
@ -46,7 +47,7 @@ func (g *grpcStream) Recv(msg interface{}) (err error) {
// #202 - inconsistent gRPC stream behavior // #202 - inconsistent gRPC stream behavior
// the only way to tell if the stream is done is when we get a EOF on the Recv // the only way to tell if the stream is done is when we get a EOF on the Recv
// here we should close the underlying gRPC ClientConn // here we should close the underlying gRPC ClientConn
closeErr := g.conn.Close() closeErr := g.Close()
if err == io.EOF && closeErr != nil { if err == io.EOF && closeErr != nil {
err = closeErr err = closeErr
} }
@ -72,5 +73,14 @@ func (g *grpcStream) setError(e error) {
// stream should still be able to receive after this function call // stream should still be able to receive after this function call
// TODO: should the conn be closed in another way? // TODO: should the conn be closed in another way?
func (g *grpcStream) Close() error { func (g *grpcStream) Close() error {
return g.stream.CloseSend() g.Lock()
defer g.Unlock()
if g.closed {
return nil
}
g.closed = true
g.stream.CloseSend()
return g.conn.Close()
} }