support streams pool for grpc (#1062)
* Update grpc_pool.go * Update options.go * Update grpc.go * Update grpc_pool_test.go * streams pool for grpc * use busy list to speed up allocate while pool is very busy * fix idle bug
This commit is contained in:
parent
f0fd4f90a5
commit
01ad981688
24
grpc.go
24
grpc.go
@ -244,6 +244,28 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) poolMaxStreams() int {
|
||||||
|
if g.opts.Context == nil {
|
||||||
|
return DefaultPoolMaxStreams
|
||||||
|
}
|
||||||
|
v := g.opts.Context.Value(poolMaxStreams{})
|
||||||
|
if v == nil {
|
||||||
|
return DefaultPoolMaxStreams
|
||||||
|
}
|
||||||
|
return v.(int)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) poolMaxIdle() int {
|
||||||
|
if g.opts.Context == nil {
|
||||||
|
return DefaultPoolMaxIdle
|
||||||
|
}
|
||||||
|
v := g.opts.Context.Value(poolMaxIdle{})
|
||||||
|
if v == nil {
|
||||||
|
return DefaultPoolMaxIdle
|
||||||
|
}
|
||||||
|
return v.(int)
|
||||||
|
}
|
||||||
|
|
||||||
func (g *grpcClient) maxRecvMsgSizeValue() int {
|
func (g *grpcClient) maxRecvMsgSizeValue() int {
|
||||||
if g.opts.Context == nil {
|
if g.opts.Context == nil {
|
||||||
return DefaultMaxRecvMsgSize
|
return DefaultMaxRecvMsgSize
|
||||||
@ -639,8 +661,8 @@ func newClient(opts ...client.Option) client.Client {
|
|||||||
rc := &grpcClient{
|
rc := &grpcClient{
|
||||||
once: sync.Once{},
|
once: sync.Once{},
|
||||||
opts: options,
|
opts: options,
|
||||||
pool: newPool(options.PoolSize, options.PoolTTL),
|
|
||||||
}
|
}
|
||||||
|
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
|
||||||
|
|
||||||
c := client.Client(rc)
|
c := client.Client(rc)
|
||||||
|
|
||||||
|
166
grpc_pool.go
166
grpc_pool.go
@ -11,47 +11,100 @@ type pool struct {
|
|||||||
size int
|
size int
|
||||||
ttl int64
|
ttl int64
|
||||||
|
|
||||||
|
// max streams on a *poolConn
|
||||||
|
maxStreams int
|
||||||
|
// max idle conns
|
||||||
|
maxIdle int
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
conns map[string][]*poolConn
|
conns map[string]*streamsPool
|
||||||
|
}
|
||||||
|
|
||||||
|
type streamsPool struct {
|
||||||
|
// head of list
|
||||||
|
head *poolConn
|
||||||
|
// busy conns list
|
||||||
|
busy *poolConn
|
||||||
|
// the siza of list
|
||||||
|
count int
|
||||||
|
// idle conn
|
||||||
|
idle int
|
||||||
}
|
}
|
||||||
|
|
||||||
type poolConn struct {
|
type poolConn struct {
|
||||||
|
// grpc conn
|
||||||
*grpc.ClientConn
|
*grpc.ClientConn
|
||||||
|
err error
|
||||||
|
addr string
|
||||||
|
|
||||||
|
// pool and streams pool
|
||||||
|
pool *pool
|
||||||
|
sp *streamsPool
|
||||||
|
streams int
|
||||||
created int64
|
created int64
|
||||||
|
|
||||||
|
// list
|
||||||
|
pre *poolConn
|
||||||
|
next *poolConn
|
||||||
|
in bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPool(size int, ttl time.Duration) *pool {
|
func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
|
||||||
|
if ms <= 0 {
|
||||||
|
ms = 1
|
||||||
|
}
|
||||||
|
if idle < 0 {
|
||||||
|
idle = 0
|
||||||
|
}
|
||||||
return &pool{
|
return &pool{
|
||||||
size: size,
|
size: size,
|
||||||
ttl: int64(ttl.Seconds()),
|
ttl: int64(ttl.Seconds()),
|
||||||
conns: make(map[string][]*poolConn),
|
maxStreams: ms,
|
||||||
|
maxIdle: idle,
|
||||||
|
conns: make(map[string]*streamsPool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
||||||
p.Lock()
|
|
||||||
conns := p.conns[addr]
|
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
|
p.Lock()
|
||||||
// while we have conns check age and then return one
|
sp, ok := p.conns[addr]
|
||||||
|
if !ok {
|
||||||
|
sp = &streamsPool{head:&poolConn{}, busy:&poolConn{}, count:0, idle:0}
|
||||||
|
p.conns[addr] = sp
|
||||||
|
}
|
||||||
|
// while we have conns check streams and then return one
|
||||||
// otherwise we'll create a new conn
|
// otherwise we'll create a new conn
|
||||||
for len(conns) > 0 {
|
conn := sp.head.next
|
||||||
conn := conns[len(conns)-1]
|
for conn != nil {
|
||||||
conns = conns[:len(conns)-1]
|
// a old conn
|
||||||
p.conns[addr] = conns
|
if now-conn.created > p.ttl {
|
||||||
|
next := conn.next
|
||||||
// if conn is old kill it and move on
|
if conn.streams == 0 {
|
||||||
if d := now - conn.created; d > p.ttl {
|
removeConn(conn)
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
|
sp.idle--
|
||||||
|
}
|
||||||
|
conn = next
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// a busy conn
|
||||||
// we got a good conn, lets unlock and return it
|
if conn.streams >= p.maxStreams {
|
||||||
|
next := conn.next
|
||||||
|
removeConn(conn)
|
||||||
|
addConnAfter(conn, sp.busy)
|
||||||
|
conn = next
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// a idle conn
|
||||||
|
if conn.streams == 0 {
|
||||||
|
sp.idle--
|
||||||
|
}
|
||||||
|
// a good conn
|
||||||
|
conn.streams++
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
// create new conn
|
// create new conn
|
||||||
@ -59,25 +112,80 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
conn = &poolConn{cc,nil,addr,p,sp,1,time.Now().Unix(), nil, nil, false}
|
||||||
|
|
||||||
return &poolConn{cc, time.Now().Unix()}, nil
|
// add conn to streams pool
|
||||||
|
p.Lock()
|
||||||
|
if sp.count < p.size {
|
||||||
|
addConnAfter(conn, sp.head)
|
||||||
|
}
|
||||||
|
p.Unlock()
|
||||||
|
|
||||||
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pool) release(addr string, conn *poolConn, err error) {
|
func (p *pool) release(addr string, conn *poolConn, err error) {
|
||||||
// don't store the conn if it has errored
|
p.Lock()
|
||||||
if err != nil {
|
p, sp, created := conn.pool, conn.sp, conn.created
|
||||||
|
// try to add conn
|
||||||
|
if !conn.in && sp.count < p.size {
|
||||||
|
addConnAfter(conn, sp.head)
|
||||||
|
}
|
||||||
|
if !conn.in {
|
||||||
|
p.Unlock()
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// a busy conn
|
||||||
|
if conn.streams >= p.maxStreams {
|
||||||
|
removeConn(conn)
|
||||||
|
addConnAfter(conn, sp.head)
|
||||||
|
}
|
||||||
|
conn.streams--
|
||||||
|
// if streams == 0, we can do something
|
||||||
|
if conn.streams == 0 {
|
||||||
|
// 1. it has errored
|
||||||
|
// 2. too many idle conn or
|
||||||
|
// 3. conn is too old
|
||||||
|
now := time.Now().Unix()
|
||||||
|
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
||||||
|
removeConn(conn)
|
||||||
|
p.Unlock()
|
||||||
|
conn.ClientConn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sp.idle++
|
||||||
|
}
|
||||||
|
p.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// otherwise put it back for reuse
|
func (conn *poolConn)Close() {
|
||||||
p.Lock()
|
conn.pool.release(conn.addr, conn, conn.err)
|
||||||
conns := p.conns[addr]
|
}
|
||||||
if len(conns) >= p.size {
|
|
||||||
p.Unlock()
|
func removeConn(conn *poolConn) {
|
||||||
conn.ClientConn.Close()
|
if conn.pre != nil {
|
||||||
|
conn.pre.next = conn.next
|
||||||
|
}
|
||||||
|
if conn.next != nil {
|
||||||
|
conn.next.pre = conn.pre
|
||||||
|
}
|
||||||
|
conn.pre = nil
|
||||||
|
conn.next = nil
|
||||||
|
conn.in = false
|
||||||
|
conn.sp.count--
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.conns[addr] = append(conns, conn)
|
|
||||||
p.Unlock()
|
func addConnAfter(conn *poolConn, after *poolConn) {
|
||||||
|
conn.next = after.next
|
||||||
|
conn.pre = after
|
||||||
|
if after.next != nil {
|
||||||
|
after.next.pre = conn
|
||||||
|
}
|
||||||
|
after.next = conn
|
||||||
|
conn.in = true
|
||||||
|
conn.sp.count++
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
pb "google.golang.org/grpc/examples/helloworld/helloworld"
|
pb "google.golang.org/grpc/examples/helloworld/helloworld"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testPool(t *testing.T, size int, ttl time.Duration) {
|
func testPool(t *testing.T, size int, ttl time.Duration, idle int, ms int) {
|
||||||
// setup server
|
// setup server
|
||||||
l, err := net.Listen("tcp", ":0")
|
l, err := net.Listen("tcp", ":0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -26,7 +26,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
|
|||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
// zero pool
|
// zero pool
|
||||||
p := newPool(size, ttl)
|
p := newPool(size, ttl, idle, ms)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
// get a conn
|
// get a conn
|
||||||
@ -50,7 +50,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
|
|||||||
p.release(l.Addr().String(), cc, nil)
|
p.release(l.Addr().String(), cc, nil)
|
||||||
|
|
||||||
p.Lock()
|
p.Lock()
|
||||||
if i := len(p.conns[l.Addr().String()]); i > size {
|
if i := p.conns[l.Addr().String()].count; i > size {
|
||||||
p.Unlock()
|
p.Unlock()
|
||||||
t.Fatalf("pool size %d is greater than expected %d", i, size)
|
t.Fatalf("pool size %d is greater than expected %d", i, size)
|
||||||
}
|
}
|
||||||
@ -59,6 +59,6 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGRPCPool(t *testing.T) {
|
func TestGRPCPool(t *testing.T) {
|
||||||
testPool(t, 0, time.Minute)
|
testPool(t, 0, time.Minute, 10, 2)
|
||||||
testPool(t, 2, time.Minute)
|
testPool(t, 2, time.Minute, 10, 1)
|
||||||
}
|
}
|
||||||
|
30
options.go
30
options.go
@ -11,6 +11,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// DefaultPoolMaxStreams maximum streams on a connectioin
|
||||||
|
// (20)
|
||||||
|
DefaultPoolMaxStreams = 20
|
||||||
|
|
||||||
|
// DefaultPoolMaxIdle maximum idle conns of a pool
|
||||||
|
// (50)
|
||||||
|
DefaultPoolMaxIdle = 50
|
||||||
|
|
||||||
// DefaultMaxRecvMsgSize maximum message that client can receive
|
// DefaultMaxRecvMsgSize maximum message that client can receive
|
||||||
// (4 MB).
|
// (4 MB).
|
||||||
DefaultMaxRecvMsgSize = 1024 * 1024 * 4
|
DefaultMaxRecvMsgSize = 1024 * 1024 * 4
|
||||||
@ -20,6 +28,8 @@ var (
|
|||||||
DefaultMaxSendMsgSize = 1024 * 1024 * 4
|
DefaultMaxSendMsgSize = 1024 * 1024 * 4
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type poolMaxStreams struct {}
|
||||||
|
type poolMaxIdle struct {}
|
||||||
type codecsKey struct{}
|
type codecsKey struct{}
|
||||||
type tlsAuth struct{}
|
type tlsAuth struct{}
|
||||||
type maxRecvMsgSizeKey struct{}
|
type maxRecvMsgSizeKey struct{}
|
||||||
@ -27,6 +37,26 @@ type maxSendMsgSizeKey struct{}
|
|||||||
type grpcDialOptions struct{}
|
type grpcDialOptions struct{}
|
||||||
type grpcCallOptions struct{}
|
type grpcCallOptions struct{}
|
||||||
|
|
||||||
|
// maximum streams on a connectioin
|
||||||
|
func PoolMaxStreams(n int) client.Option {
|
||||||
|
return func(o *client.Options) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, poolMaxStreams{}, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// maximum idle conns of a pool
|
||||||
|
func PoolMaxIdle(d int) client.Option {
|
||||||
|
return func(o *client.Options) {
|
||||||
|
if o.Context == nil {
|
||||||
|
o.Context = context.Background()
|
||||||
|
}
|
||||||
|
o.Context = context.WithValue(o.Context, poolMaxIdle{}, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// gRPC Codec to be used to encode/decode requests for a given content type
|
// gRPC Codec to be used to encode/decode requests for a given content type
|
||||||
func Codec(contentType string, c encoding.Codec) client.Option {
|
func Codec(contentType string, c encoding.Codec) client.Option {
|
||||||
return func(o *client.Options) {
|
return func(o *client.Options) {
|
||||||
|
Loading…
Reference in New Issue
Block a user