diff --git a/client/rpc_client.go b/client/rpc_client.go index 05ba8345..737bc89e 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -82,7 +82,11 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r defer c.Close() client := rpc.NewClientWithCodec(newRpcPlusCodec(msg, c)) - return client.Call(ctx, request.Method(), request.Request(), response) + err = client.Call(ctx, request.Method(), request.Request(), response) + if err != nil { + return err + } + return client.Close() } func (r *rpcClient) stream(ctx context.Context, address string, request Request, responseChan interface{}) (Streamer, error) { diff --git a/transport/http_transport.go b/transport/http_transport.go index 4825b197..9b6afabc 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -4,10 +4,12 @@ import ( "bufio" "bytes" "errors" + "io" "io/ioutil" "net" "net/http" "net/url" + "sync" ) type httpTransport struct{} @@ -19,6 +21,7 @@ type httpTransportClient struct { buff *bufio.Reader dialOpts dialOptions r chan *http.Request + once sync.Once } type httpTransportSocket struct { @@ -63,7 +66,11 @@ func (h *httpTransportClient) Send(m *Message) error { func (h *httpTransportClient) Recv(m *Message) error { var r *http.Request if !h.dialOpts.stream { - r = <-h.r + rc, ok := <-h.r + if !ok { + return io.EOF + } + r = rc } rsp, err := http.ReadResponse(h.buff, r) @@ -96,6 +103,9 @@ func (h *httpTransportClient) Recv(m *Message) error { func (h *httpTransportClient) Close() error { h.buff.Reset(nil) + h.once.Do(func() { + close(h.r) + }) return h.conn.Close() }