ensure we close the grpc stream (#1098)
* ensure we close the grpc stream * use g.Close * use closed bool flag for checking connection close
This commit is contained in:
parent
8c86ad526f
commit
5422d368c0
1
grpc.go
1
grpc.go
@ -211,6 +211,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
|||||||
if opts := g.getGrpcCallOptions(); opts != nil {
|
if opts := g.getGrpcCallOptions(); opts != nil {
|
||||||
grpcCallOptions = append(grpcCallOptions, opts...)
|
grpcCallOptions = append(grpcCallOptions, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
|
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
||||||
|
14
stream.go
14
stream.go
@ -12,6 +12,7 @@ import (
|
|||||||
// Implements the streamer interface
|
// Implements the streamer interface
|
||||||
type grpcStream struct {
|
type grpcStream struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
closed bool
|
||||||
err error
|
err error
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
stream grpc.ClientStream
|
stream grpc.ClientStream
|
||||||
@ -46,7 +47,7 @@ func (g *grpcStream) Recv(msg interface{}) (err error) {
|
|||||||
// #202 - inconsistent gRPC stream behavior
|
// #202 - inconsistent gRPC stream behavior
|
||||||
// the only way to tell if the stream is done is when we get a EOF on the Recv
|
// the only way to tell if the stream is done is when we get a EOF on the Recv
|
||||||
// here we should close the underlying gRPC ClientConn
|
// here we should close the underlying gRPC ClientConn
|
||||||
closeErr := g.conn.Close()
|
closeErr := g.Close()
|
||||||
if err == io.EOF && closeErr != nil {
|
if err == io.EOF && closeErr != nil {
|
||||||
err = closeErr
|
err = closeErr
|
||||||
}
|
}
|
||||||
@ -72,5 +73,14 @@ func (g *grpcStream) setError(e error) {
|
|||||||
// stream should still be able to receive after this function call
|
// stream should still be able to receive after this function call
|
||||||
// TODO: should the conn be closed in another way?
|
// TODO: should the conn be closed in another way?
|
||||||
func (g *grpcStream) Close() error {
|
func (g *grpcStream) Close() error {
|
||||||
return g.stream.CloseSend()
|
g.Lock()
|
||||||
|
defer g.Unlock()
|
||||||
|
|
||||||
|
if g.closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
g.closed = true
|
||||||
|
g.stream.CloseSend()
|
||||||
|
return g.conn.Close()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user