diff --git a/client/rpc_client.go b/client/rpc_client.go index a5be4f4e..e251b1a4 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -13,12 +13,14 @@ import ( "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/selector" "github.com/micro/go-micro/transport" + "sync/atomic" ) type rpcClient struct { once sync.Once opts Options pool *pool + seq uint64 } func newRpcClient(opt ...Option) Client { @@ -28,6 +30,7 @@ func newRpcClient(opt ...Option) Client { once: sync.Once{}, opts: opts, pool: newPool(opts.PoolSize, opts.PoolTTL), + seq: 0, } c := Client(rc) @@ -84,11 +87,15 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp r.pool.release(address, c, grr) }() + seq := r.seq + atomic.AddUint64(&r.seq, 1) + stream := &rpcStream{ context: ctx, request: req, closed: make(chan bool), codec: newRpcPlusCodec(msg, c, cf), + seq: seq, } defer stream.Close() diff --git a/client/rpc_stream.go b/client/rpc_stream.go index d508b860..1fb76506 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -44,8 +44,7 @@ func (r *rpcStream) Send(msg interface{}) error { } seq := r.seq - r.seq++ - + req := request{ Service: r.request.Service(), Seq: seq,