package grpc import ( "context" "io" "sync" "go.unistack.org/micro/v3/client" "google.golang.org/grpc" ) // Implements the streamer interface type grpcStream struct { grpc.ClientStream context context.Context err error request client.Request response client.Response close func(err error) conn *poolConn sync.RWMutex closed bool } func (g *grpcStream) Context() context.Context { return g.context } func (g *grpcStream) Request() client.Request { return g.request } func (g *grpcStream) Response() client.Response { return g.response } func (g *grpcStream) Send(msg interface{}) error { if err := g.ClientStream.SendMsg(msg); err != nil { g.setError(err) return err } return nil } func (g *grpcStream) SendMsg(msg interface{}) error { if err := g.ClientStream.SendMsg(msg); err != nil { g.setError(err) return err } return nil } func (g *grpcStream) Recv(msg interface{}) (err error) { defer g.setError(err) if err = g.ClientStream.RecvMsg(msg); err != nil { // #202 - inconsistent gRPC stream behavior // the only way to tell if the stream is done is when we get a EOF on the Recv // here we should close the underlying gRPC ClientConn closeErr := g.Close() if err == io.EOF && closeErr != nil { err = closeErr } return err } return } func (g *grpcStream) RecvMsg(msg interface{}) (err error) { defer g.setError(err) if err = g.ClientStream.RecvMsg(msg); err != nil { // #202 - inconsistent gRPC stream behavior // the only way to tell if the stream is done is when we get a EOF on the Recv // here we should close the underlying gRPC ClientConn closeErr := g.Close() if err == io.EOF && closeErr != nil { err = closeErr } return err } return } func (g *grpcStream) Error() error { g.RLock() defer g.RUnlock() return g.err } func (g *grpcStream) setError(e error) { g.Lock() g.err = e g.Unlock() } // Close the gRPC send stream // #202 - inconsistent gRPC stream behavior // The underlying gRPC stream should not be closed here since the // stream should still be able to receive after this function call // TODO: should the conn be closed in another way? func (g *grpcStream) Close() error { g.Lock() defer g.Unlock() if g.closed { return nil } // close the connection g.closed = true g.close(g.err) return g.ClientStream.CloseSend() } func (g *grpcStream) CloseSend() error { g.Lock() defer g.Unlock() if g.closed { return nil } // close the connection g.closed = true g.close(g.err) return g.ClientStream.CloseSend() }