From bffd55f500647b8b6fa51edf50672a4c218d6700 Mon Sep 17 00:00:00 2001 From: Asim Date: Mon, 28 Dec 2015 19:11:10 +0000 Subject: [PATCH] Channel rather than mutex to check is closed --- client/rpc_client.go | 3 +++ client/rpc_stream.go | 28 ++++++++++++++++++++-------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index fa49100f..406e2836 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -136,9 +136,12 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } + var once sync.Once stream := &rpcStream{ context: ctx, request: req, + once: once, + closed: make(chan bool), codec: newRpcPlusCodec(msg, c, cf), } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 837b97d2..0f40b93f 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -13,13 +13,25 @@ import ( type rpcStream struct { sync.RWMutex seq uint64 - closed bool + once sync.Once + closed chan bool err error request Request codec clientCodec context context.Context } +func (r *rpcStream) isClosed() bool { + select { + case _, ok := <-r.closed: + if !ok { + return true + } + default: + } + return false +} + func (r *rpcStream) Context() context.Context { return r.context } @@ -32,7 +44,7 @@ func (r *rpcStream) Send(msg interface{}) error { r.Lock() defer r.Unlock() - if r.closed { + if r.isClosed() { r.err = errShutdown return errShutdown } @@ -57,14 +69,14 @@ func (r *rpcStream) Recv(msg interface{}) error { r.Lock() defer r.Unlock() - if r.closed { + if r.isClosed() { r.err = errShutdown return errShutdown } var resp response if err := r.codec.ReadResponseHeader(&resp); err != nil { - if err == io.EOF && !r.closed { + if err == io.EOF && !r.isClosed() { r.err = io.ErrUnexpectedEOF return io.ErrUnexpectedEOF } @@ -91,7 +103,7 @@ func (r *rpcStream) Recv(msg interface{}) error { } } - if r.err != nil && r.err != io.EOF && !r.closed { + if r.err != nil && r.err != io.EOF && !r.isClosed() { log.Println("rpc: client protocol error:", r.err) } @@ -105,8 +117,8 @@ func (r *rpcStream) Error() error { } func (r *rpcStream) Close() error { - r.Lock() - defer r.Unlock() - r.closed = true + r.once.Do(func() { + close(r.closed) + }) return r.codec.Close() }