embed grpc server stream and client so they can be accessed (#1916)

This commit is contained in:
Asim Aslam 2020-08-09 15:43:41 +01:00 committed by Vasiliy Tolstov
parent 6cb3fad421
commit 1003b968cf
2 changed files with 13 additions and 7 deletions

View File

@ -238,15 +238,15 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
// setup the stream response // setup the stream response
stream := &grpcStream{ stream := &grpcStream{
context: ctx, ClientStream: st,
request: req, context: ctx,
request: req,
response: &response{ response: &response{
conn: cc, conn: cc,
stream: st, stream: st,
codec: cf, codec: cf,
gcodec: codec, gcodec: codec,
}, },
stream: st,
conn: cc, conn: cc,
cancel: cancel, cancel: cancel,
} }

View File

@ -11,11 +11,13 @@ import (
// Implements the streamer interface // Implements the streamer interface
type grpcStream struct { type grpcStream struct {
// embed so we can access if need be
grpc.ClientStream
sync.RWMutex sync.RWMutex
closed bool closed bool
err error err error
conn *grpc.ClientConn conn *grpc.ClientConn
stream grpc.ClientStream
request client.Request request client.Request
response client.Response response client.Response
context context.Context context context.Context
@ -35,7 +37,7 @@ func (g *grpcStream) Response() client.Response {
} }
func (g *grpcStream) Send(msg interface{}) error { func (g *grpcStream) Send(msg interface{}) error {
if err := g.stream.SendMsg(msg); err != nil { if err := g.ClientStream.SendMsg(msg); err != nil {
g.setError(err) g.setError(err)
return err return err
} }
@ -44,7 +46,8 @@ func (g *grpcStream) Send(msg interface{}) error {
func (g *grpcStream) Recv(msg interface{}) (err error) { func (g *grpcStream) Recv(msg interface{}) (err error) {
defer g.setError(err) defer g.setError(err)
if err = g.stream.RecvMsg(msg); err != nil {
if err = g.ClientStream.RecvMsg(msg); err != nil {
// #202 - inconsistent gRPC stream behavior // #202 - inconsistent gRPC stream behavior
// the only way to tell if the stream is done is when we get a EOF on the Recv // the only way to tell if the stream is done is when we get a EOF on the Recv
// here we should close the underlying gRPC ClientConn // here we should close the underlying gRPC ClientConn
@ -52,7 +55,10 @@ func (g *grpcStream) Recv(msg interface{}) (err error) {
if err == io.EOF && closeErr != nil { if err == io.EOF && closeErr != nil {
err = closeErr err = closeErr
} }
return err
} }
return return
} }
@ -83,6 +89,6 @@ func (g *grpcStream) Close() error {
// cancel the context // cancel the context
g.cancel() g.cancel()
g.closed = true g.closed = true
g.stream.CloseSend() g.ClientStream.CloseSend()
return g.conn.Close() return g.conn.Close()
} }