support streams pool for grpc (#1062)

* Update grpc_pool.go

* Update options.go

* Update grpc.go

* Update grpc_pool_test.go

* streams pool for grpc

* use busy list to speed up allocate while pool is very busy

* fix idle bug
This commit is contained in:
jamsonzan 2019-12-27 20:25:58 +08:00 committed by Asim Aslam
parent a09b6729cc
commit 22aa7d14b3
4 changed files with 199 additions and 38 deletions

View File

@ -244,6 +244,28 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
}, nil }, nil
} }
func (g *grpcClient) poolMaxStreams() int {
if g.opts.Context == nil {
return DefaultPoolMaxStreams
}
v := g.opts.Context.Value(poolMaxStreams{})
if v == nil {
return DefaultPoolMaxStreams
}
return v.(int)
}
func (g *grpcClient) poolMaxIdle() int {
if g.opts.Context == nil {
return DefaultPoolMaxIdle
}
v := g.opts.Context.Value(poolMaxIdle{})
if v == nil {
return DefaultPoolMaxIdle
}
return v.(int)
}
func (g *grpcClient) maxRecvMsgSizeValue() int { func (g *grpcClient) maxRecvMsgSizeValue() int {
if g.opts.Context == nil { if g.opts.Context == nil {
return DefaultMaxRecvMsgSize return DefaultMaxRecvMsgSize
@ -639,8 +661,8 @@ func newClient(opts ...client.Option) client.Client {
rc := &grpcClient{ rc := &grpcClient{
once: sync.Once{}, once: sync.Once{},
opts: options, opts: options,
pool: newPool(options.PoolSize, options.PoolTTL),
} }
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c := client.Client(rc) c := client.Client(rc)

View File

@ -10,74 +10,182 @@ import (
type pool struct { type pool struct {
size int size int
ttl int64 ttl int64
// max streams on a *poolConn
maxStreams int
// max idle conns
maxIdle int
sync.Mutex sync.Mutex
conns map[string][]*poolConn conns map[string]*streamsPool
}
type streamsPool struct {
// head of list
head *poolConn
// busy conns list
busy *poolConn
// the siza of list
count int
// idle conn
idle int
} }
type poolConn struct { type poolConn struct {
// grpc conn
*grpc.ClientConn *grpc.ClientConn
err error
addr string
// pool and streams pool
pool *pool
sp *streamsPool
streams int
created int64 created int64
// list
pre *poolConn
next *poolConn
in bool
} }
func newPool(size int, ttl time.Duration) *pool { func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
if ms <= 0 {
ms = 1
}
if idle < 0 {
idle = 0
}
return &pool{ return &pool{
size: size, size: size,
ttl: int64(ttl.Seconds()), ttl: int64(ttl.Seconds()),
conns: make(map[string][]*poolConn), maxStreams: ms,
maxIdle: idle,
conns: make(map[string]*streamsPool),
} }
} }
func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) { func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) {
p.Lock()
conns := p.conns[addr]
now := time.Now().Unix() now := time.Now().Unix()
p.Lock()
// while we have conns check age and then return one sp, ok := p.conns[addr]
// otherwise we'll create a new conn if !ok {
for len(conns) > 0 { sp = &streamsPool{head:&poolConn{}, busy:&poolConn{}, count:0, idle:0}
conn := conns[len(conns)-1] p.conns[addr] = sp
conns = conns[:len(conns)-1] }
p.conns[addr] = conns // while we have conns check streams and then return one
// otherwise we'll create a new conn
// if conn is old kill it and move on conn := sp.head.next
if d := now - conn.created; d > p.ttl { for conn != nil {
conn.ClientConn.Close() // a old conn
if now-conn.created > p.ttl {
next := conn.next
if conn.streams == 0 {
removeConn(conn)
conn.ClientConn.Close()
sp.idle--
}
conn = next
continue continue
} }
// a busy conn
// we got a good conn, lets unlock and return it if conn.streams >= p.maxStreams {
next := conn.next
removeConn(conn)
addConnAfter(conn, sp.busy)
conn = next
continue
}
// a idle conn
if conn.streams == 0 {
sp.idle--
}
// a good conn
conn.streams++
p.Unlock() p.Unlock()
return conn, nil return conn, nil
} }
p.Unlock() p.Unlock()
// create new conn // create new conn
cc, err := grpc.Dial(addr, opts...) cc, err := grpc.Dial(addr, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn = &poolConn{cc,nil,addr,p,sp,1,time.Now().Unix(), nil, nil, false}
return &poolConn{cc, time.Now().Unix()}, nil // add conn to streams pool
p.Lock()
if sp.count < p.size {
addConnAfter(conn, sp.head)
}
p.Unlock()
return conn, nil
} }
func (p *pool) release(addr string, conn *poolConn, err error) { func (p *pool) release(addr string, conn *poolConn, err error) {
// don't store the conn if it has errored
if err != nil {
conn.ClientConn.Close()
return
}
// otherwise put it back for reuse
p.Lock() p.Lock()
conns := p.conns[addr] p, sp, created := conn.pool, conn.sp, conn.created
if len(conns) >= p.size { // try to add conn
if !conn.in && sp.count < p.size {
addConnAfter(conn, sp.head)
}
if !conn.in {
p.Unlock() p.Unlock()
conn.ClientConn.Close() conn.ClientConn.Close()
return return
} }
p.conns[addr] = append(conns, conn) // a busy conn
if conn.streams >= p.maxStreams {
removeConn(conn)
addConnAfter(conn, sp.head)
}
conn.streams--
// if streams == 0, we can do something
if conn.streams == 0 {
// 1. it has errored
// 2. too many idle conn or
// 3. conn is too old
now := time.Now().Unix()
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
removeConn(conn)
p.Unlock()
conn.ClientConn.Close()
return
}
sp.idle++
}
p.Unlock() p.Unlock()
return
}
func (conn *poolConn)Close() {
conn.pool.release(conn.addr, conn, conn.err)
}
func removeConn(conn *poolConn) {
if conn.pre != nil {
conn.pre.next = conn.next
}
if conn.next != nil {
conn.next.pre = conn.pre
}
conn.pre = nil
conn.next = nil
conn.in = false
conn.sp.count--
return
}
func addConnAfter(conn *poolConn, after *poolConn) {
conn.next = after.next
conn.pre = after
if after.next != nil {
after.next.pre = conn
}
after.next = conn
conn.in = true
conn.sp.count++
return
} }

View File

@ -11,7 +11,7 @@ import (
pb "google.golang.org/grpc/examples/helloworld/helloworld" pb "google.golang.org/grpc/examples/helloworld/helloworld"
) )
func testPool(t *testing.T, size int, ttl time.Duration) { func testPool(t *testing.T, size int, ttl time.Duration, idle int, ms int) {
// setup server // setup server
l, err := net.Listen("tcp", ":0") l, err := net.Listen("tcp", ":0")
if err != nil { if err != nil {
@ -26,7 +26,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
defer s.Stop() defer s.Stop()
// zero pool // zero pool
p := newPool(size, ttl) p := newPool(size, ttl, idle, ms)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
// get a conn // get a conn
@ -50,7 +50,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
p.release(l.Addr().String(), cc, nil) p.release(l.Addr().String(), cc, nil)
p.Lock() p.Lock()
if i := len(p.conns[l.Addr().String()]); i > size { if i := p.conns[l.Addr().String()].count; i > size {
p.Unlock() p.Unlock()
t.Fatalf("pool size %d is greater than expected %d", i, size) t.Fatalf("pool size %d is greater than expected %d", i, size)
} }
@ -59,6 +59,6 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
} }
func TestGRPCPool(t *testing.T) { func TestGRPCPool(t *testing.T) {
testPool(t, 0, time.Minute) testPool(t, 0, time.Minute, 10, 2)
testPool(t, 2, time.Minute) testPool(t, 2, time.Minute, 10, 1)
} }

View File

@ -11,6 +11,14 @@ import (
) )
var ( var (
// DefaultPoolMaxStreams maximum streams on a connectioin
// (20)
DefaultPoolMaxStreams = 20
// DefaultPoolMaxIdle maximum idle conns of a pool
// (50)
DefaultPoolMaxIdle = 50
// DefaultMaxRecvMsgSize maximum message that client can receive // DefaultMaxRecvMsgSize maximum message that client can receive
// (4 MB). // (4 MB).
DefaultMaxRecvMsgSize = 1024 * 1024 * 4 DefaultMaxRecvMsgSize = 1024 * 1024 * 4
@ -20,6 +28,8 @@ var (
DefaultMaxSendMsgSize = 1024 * 1024 * 4 DefaultMaxSendMsgSize = 1024 * 1024 * 4
) )
type poolMaxStreams struct {}
type poolMaxIdle struct {}
type codecsKey struct{} type codecsKey struct{}
type tlsAuth struct{} type tlsAuth struct{}
type maxRecvMsgSizeKey struct{} type maxRecvMsgSizeKey struct{}
@ -27,6 +37,26 @@ type maxSendMsgSizeKey struct{}
type grpcDialOptions struct{} type grpcDialOptions struct{}
type grpcCallOptions struct{} type grpcCallOptions struct{}
// maximum streams on a connectioin
func PoolMaxStreams(n int) client.Option {
return func(o *client.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, poolMaxStreams{}, n)
}
}
// maximum idle conns of a pool
func PoolMaxIdle(d int) client.Option {
return func(o *client.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, poolMaxIdle{}, d)
}
}
// gRPC Codec to be used to encode/decode requests for a given content type // gRPC Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c encoding.Codec) client.Option { func Codec(contentType string, c encoding.Codec) client.Option {
return func(o *client.Options) { return func(o *client.Options) {
@ -99,3 +129,4 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption {
o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts)
} }
} }