diff --git a/grpc.go b/grpc.go index e2a0f9d..98d40f5 100644 --- a/grpc.go +++ b/grpc.go @@ -238,15 +238,15 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client // setup the stream response stream := &grpcStream{ - context: ctx, - request: req, + ClientStream: st, + context: ctx, + request: req, response: &response{ conn: cc, stream: st, codec: cf, gcodec: codec, }, - stream: st, conn: cc, cancel: cancel, } diff --git a/stream.go b/stream.go index 5051718..d27493b 100644 --- a/stream.go +++ b/stream.go @@ -11,11 +11,13 @@ import ( // Implements the streamer interface type grpcStream struct { + // embed so we can access if need be + grpc.ClientStream + sync.RWMutex closed bool err error conn *grpc.ClientConn - stream grpc.ClientStream request client.Request response client.Response context context.Context @@ -35,7 +37,7 @@ func (g *grpcStream) Response() client.Response { } func (g *grpcStream) Send(msg interface{}) error { - if err := g.stream.SendMsg(msg); err != nil { + if err := g.ClientStream.SendMsg(msg); err != nil { g.setError(err) return err } @@ -44,7 +46,8 @@ func (g *grpcStream) Send(msg interface{}) error { func (g *grpcStream) Recv(msg interface{}) (err error) { defer g.setError(err) - if err = g.stream.RecvMsg(msg); err != nil { + + if err = g.ClientStream.RecvMsg(msg); err != nil { // #202 - inconsistent gRPC stream behavior // the only way to tell if the stream is done is when we get a EOF on the Recv // here we should close the underlying gRPC ClientConn @@ -52,7 +55,10 @@ func (g *grpcStream) Recv(msg interface{}) (err error) { if err == io.EOF && closeErr != nil { err = closeErr } + + return err } + return } @@ -83,6 +89,6 @@ func (g *grpcStream) Close() error { // cancel the context g.cancel() g.closed = true - g.stream.CloseSend() + g.ClientStream.CloseSend() return g.conn.Close() }