From 22aa7d14b369331c5fff7a92db5a58ef57f71a2b Mon Sep 17 00:00:00 2001 From: jamsonzan <31194068+jamsonzan@users.noreply.github.com> Date: Fri, 27 Dec 2019 20:25:58 +0800 Subject: [PATCH] support streams pool for grpc (#1062) * Update grpc_pool.go * Update options.go * Update grpc.go * Update grpc_pool_test.go * streams pool for grpc * use busy list to speed up allocate while pool is very busy * fix idle bug --- client/grpc/grpc.go | 24 ++++- client/grpc/grpc_pool.go | 172 +++++++++++++++++++++++++++------- client/grpc/grpc_pool_test.go | 10 +- client/grpc/options.go | 31 ++++++ 4 files changed, 199 insertions(+), 38 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index a593b690..792563d7 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -244,6 +244,28 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client }, nil } +func (g *grpcClient) poolMaxStreams() int { + if g.opts.Context == nil { + return DefaultPoolMaxStreams + } + v := g.opts.Context.Value(poolMaxStreams{}) + if v == nil { + return DefaultPoolMaxStreams + } + return v.(int) +} + +func (g *grpcClient) poolMaxIdle() int { + if g.opts.Context == nil { + return DefaultPoolMaxIdle + } + v := g.opts.Context.Value(poolMaxIdle{}) + if v == nil { + return DefaultPoolMaxIdle + } + return v.(int) +} + func (g *grpcClient) maxRecvMsgSizeValue() int { if g.opts.Context == nil { return DefaultMaxRecvMsgSize @@ -639,8 +661,8 @@ func newClient(opts ...client.Option) client.Client { rc := &grpcClient{ once: sync.Once{}, opts: options, - pool: newPool(options.PoolSize, options.PoolTTL), } + rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c := client.Client(rc) diff --git a/client/grpc/grpc_pool.go b/client/grpc/grpc_pool.go index ea084e74..ba076b2a 100644 --- a/client/grpc/grpc_pool.go +++ b/client/grpc/grpc_pool.go @@ -10,74 +10,182 @@ import ( type pool struct { size int ttl int64 + + // max streams on a *poolConn + maxStreams int + // max idle conns + maxIdle int sync.Mutex - conns map[string][]*poolConn + conns map[string]*streamsPool +} + +type streamsPool struct { + // head of list + head *poolConn + // busy conns list + busy *poolConn + // the siza of list + count int + // idle conn + idle int } type poolConn struct { + // grpc conn *grpc.ClientConn + err error + addr string + + // pool and streams pool + pool *pool + sp *streamsPool + streams int created int64 + + // list + pre *poolConn + next *poolConn + in bool } -func newPool(size int, ttl time.Duration) *pool { +func newPool(size int, ttl time.Duration, idle int, ms int) *pool { + if ms <= 0 { + ms = 1 + } + if idle < 0 { + idle = 0 + } return &pool{ size: size, ttl: int64(ttl.Seconds()), - conns: make(map[string][]*poolConn), + maxStreams: ms, + maxIdle: idle, + conns: make(map[string]*streamsPool), } } func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) { - p.Lock() - conns := p.conns[addr] now := time.Now().Unix() - - // while we have conns check age and then return one - // otherwise we'll create a new conn - for len(conns) > 0 { - conn := conns[len(conns)-1] - conns = conns[:len(conns)-1] - p.conns[addr] = conns - - // if conn is old kill it and move on - if d := now - conn.created; d > p.ttl { - conn.ClientConn.Close() + p.Lock() + sp, ok := p.conns[addr] + if !ok { + sp = &streamsPool{head:&poolConn{}, busy:&poolConn{}, count:0, idle:0} + p.conns[addr] = sp + } + // while we have conns check streams and then return one + // otherwise we'll create a new conn + conn := sp.head.next + for conn != nil { + // a old conn + if now-conn.created > p.ttl { + next := conn.next + if conn.streams == 0 { + removeConn(conn) + conn.ClientConn.Close() + sp.idle-- + } + conn = next continue } - - // we got a good conn, lets unlock and return it + // a busy conn + if conn.streams >= p.maxStreams { + next := conn.next + removeConn(conn) + addConnAfter(conn, sp.busy) + conn = next + continue + } + // a idle conn + if conn.streams == 0 { + sp.idle-- + } + // a good conn + conn.streams++ p.Unlock() - return conn, nil } - p.Unlock() - // create new conn + // create new conn cc, err := grpc.Dial(addr, opts...) if err != nil { return nil, err } + conn = &poolConn{cc,nil,addr,p,sp,1,time.Now().Unix(), nil, nil, false} - return &poolConn{cc, time.Now().Unix()}, nil + // add conn to streams pool + p.Lock() + if sp.count < p.size { + addConnAfter(conn, sp.head) + } + p.Unlock() + + return conn, nil } func (p *pool) release(addr string, conn *poolConn, err error) { - // don't store the conn if it has errored - if err != nil { - conn.ClientConn.Close() - return - } - - // otherwise put it back for reuse p.Lock() - conns := p.conns[addr] - if len(conns) >= p.size { + p, sp, created := conn.pool, conn.sp, conn.created + // try to add conn + if !conn.in && sp.count < p.size { + addConnAfter(conn, sp.head) + } + if !conn.in { p.Unlock() conn.ClientConn.Close() return } - p.conns[addr] = append(conns, conn) + // a busy conn + if conn.streams >= p.maxStreams { + removeConn(conn) + addConnAfter(conn, sp.head) + } + conn.streams-- + // if streams == 0, we can do something + if conn.streams == 0 { + // 1. it has errored + // 2. too many idle conn or + // 3. conn is too old + now := time.Now().Unix() + if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { + removeConn(conn) + p.Unlock() + conn.ClientConn.Close() + return + } + sp.idle++ + } p.Unlock() + return +} + +func (conn *poolConn)Close() { + conn.pool.release(conn.addr, conn, conn.err) +} + +func removeConn(conn *poolConn) { + if conn.pre != nil { + conn.pre.next = conn.next + } + if conn.next != nil { + conn.next.pre = conn.pre + } + conn.pre = nil + conn.next = nil + conn.in = false + conn.sp.count-- + return +} + +func addConnAfter(conn *poolConn, after *poolConn) { + conn.next = after.next + conn.pre = after + if after.next != nil { + after.next.pre = conn + } + after.next = conn + conn.in = true + conn.sp.count++ + return } diff --git a/client/grpc/grpc_pool_test.go b/client/grpc/grpc_pool_test.go index d5c1fdb0..aa087ae0 100644 --- a/client/grpc/grpc_pool_test.go +++ b/client/grpc/grpc_pool_test.go @@ -11,7 +11,7 @@ import ( pb "google.golang.org/grpc/examples/helloworld/helloworld" ) -func testPool(t *testing.T, size int, ttl time.Duration) { +func testPool(t *testing.T, size int, ttl time.Duration, idle int, ms int) { // setup server l, err := net.Listen("tcp", ":0") if err != nil { @@ -26,7 +26,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) { defer s.Stop() // zero pool - p := newPool(size, ttl) + p := newPool(size, ttl, idle, ms) for i := 0; i < 10; i++ { // get a conn @@ -50,7 +50,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) { p.release(l.Addr().String(), cc, nil) p.Lock() - if i := len(p.conns[l.Addr().String()]); i > size { + if i := p.conns[l.Addr().String()].count; i > size { p.Unlock() t.Fatalf("pool size %d is greater than expected %d", i, size) } @@ -59,6 +59,6 @@ func testPool(t *testing.T, size int, ttl time.Duration) { } func TestGRPCPool(t *testing.T) { - testPool(t, 0, time.Minute) - testPool(t, 2, time.Minute) + testPool(t, 0, time.Minute, 10, 2) + testPool(t, 2, time.Minute, 10, 1) } diff --git a/client/grpc/options.go b/client/grpc/options.go index e7f2fceb..8384c48a 100644 --- a/client/grpc/options.go +++ b/client/grpc/options.go @@ -11,6 +11,14 @@ import ( ) var ( + // DefaultPoolMaxStreams maximum streams on a connectioin + // (20) + DefaultPoolMaxStreams = 20 + + // DefaultPoolMaxIdle maximum idle conns of a pool + // (50) + DefaultPoolMaxIdle = 50 + // DefaultMaxRecvMsgSize maximum message that client can receive // (4 MB). DefaultMaxRecvMsgSize = 1024 * 1024 * 4 @@ -20,6 +28,8 @@ var ( DefaultMaxSendMsgSize = 1024 * 1024 * 4 ) +type poolMaxStreams struct {} +type poolMaxIdle struct {} type codecsKey struct{} type tlsAuth struct{} type maxRecvMsgSizeKey struct{} @@ -27,6 +37,26 @@ type maxSendMsgSizeKey struct{} type grpcDialOptions struct{} type grpcCallOptions struct{} +// maximum streams on a connectioin +func PoolMaxStreams(n int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, poolMaxStreams{}, n) + } +} + +// maximum idle conns of a pool +func PoolMaxIdle(d int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, poolMaxIdle{}, d) + } +} + // gRPC Codec to be used to encode/decode requests for a given content type func Codec(contentType string, c encoding.Codec) client.Option { return func(o *client.Options) { @@ -99,3 +129,4 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption { o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) } } +