commit
edaa0a0719
@ -13,12 +13,14 @@ import (
|
|||||||
"github.com/micro/go-micro/metadata"
|
"github.com/micro/go-micro/metadata"
|
||||||
"github.com/micro/go-micro/selector"
|
"github.com/micro/go-micro/selector"
|
||||||
"github.com/micro/go-micro/transport"
|
"github.com/micro/go-micro/transport"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type rpcClient struct {
|
type rpcClient struct {
|
||||||
once sync.Once
|
once sync.Once
|
||||||
opts Options
|
opts Options
|
||||||
pool *pool
|
pool *pool
|
||||||
|
seq uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRpcClient(opt ...Option) Client {
|
func newRpcClient(opt ...Option) Client {
|
||||||
@ -28,6 +30,7 @@ func newRpcClient(opt ...Option) Client {
|
|||||||
once: sync.Once{},
|
once: sync.Once{},
|
||||||
opts: opts,
|
opts: opts,
|
||||||
pool: newPool(opts.PoolSize, opts.PoolTTL),
|
pool: newPool(opts.PoolSize, opts.PoolTTL),
|
||||||
|
seq: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
c := Client(rc)
|
c := Client(rc)
|
||||||
@ -84,11 +87,15 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
|||||||
r.pool.release(address, c, grr)
|
r.pool.release(address, c, grr)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
seq := r.seq
|
||||||
|
atomic.AddUint64(&r.seq, 1)
|
||||||
|
|
||||||
stream := &rpcStream{
|
stream := &rpcStream{
|
||||||
context: ctx,
|
context: ctx,
|
||||||
request: req,
|
request: req,
|
||||||
closed: make(chan bool),
|
closed: make(chan bool),
|
||||||
codec: newRpcPlusCodec(msg, c, cf),
|
codec: newRpcPlusCodec(msg, c, cf),
|
||||||
|
seq: seq,
|
||||||
}
|
}
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|
||||||
|
@ -44,8 +44,7 @@ func (r *rpcStream) Send(msg interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
seq := r.seq
|
seq := r.seq
|
||||||
r.seq++
|
|
||||||
|
|
||||||
req := request{
|
req := request{
|
||||||
Service: r.request.Service(),
|
Service: r.request.Service(),
|
||||||
Seq: seq,
|
Seq: seq,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user