212 lines
4.0 KiB
Go
212 lines
4.0 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
)
|
|
|
|
type ConnPool struct {
|
|
conns map[string]*streamsPool
|
|
size int
|
|
ttl int64
|
|
maxStreams int
|
|
maxIdle int
|
|
sync.Mutex
|
|
}
|
|
|
|
type streamsPool struct {
|
|
// head of list
|
|
head *PoolConn
|
|
// busy conns list
|
|
busy *PoolConn
|
|
// the siza of list
|
|
count int
|
|
// idle conn
|
|
idle int
|
|
}
|
|
|
|
type PoolConn struct {
|
|
err error
|
|
*grpc.ClientConn
|
|
next *PoolConn
|
|
pool *ConnPool
|
|
sp *streamsPool
|
|
pre *PoolConn
|
|
addr string
|
|
streams int
|
|
created int64
|
|
in bool
|
|
}
|
|
|
|
func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
|
|
if ms <= 0 {
|
|
ms = 1
|
|
}
|
|
if idle < 0 {
|
|
idle = 0
|
|
}
|
|
return &ConnPool{
|
|
size: size,
|
|
ttl: int64(ttl.Seconds()),
|
|
maxStreams: ms,
|
|
maxIdle: idle,
|
|
conns: make(map[string]*streamsPool),
|
|
}
|
|
}
|
|
|
|
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) {
|
|
if strings.HasPrefix(addr, "http") {
|
|
addr = addr[strings.Index(addr, ":")+3:]
|
|
}
|
|
now := time.Now().Unix()
|
|
p.Lock()
|
|
sp, ok := p.conns[addr]
|
|
if !ok {
|
|
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
|
p.conns[addr] = sp
|
|
}
|
|
// while we have conns check streams and then return one
|
|
// otherwise we'll create a new conn
|
|
conn := sp.head.next
|
|
for conn != nil {
|
|
// check conn state
|
|
// https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
|
|
switch conn.GetState() {
|
|
case connectivity.Connecting:
|
|
conn = conn.next
|
|
continue
|
|
case connectivity.Shutdown:
|
|
next := conn.next
|
|
if conn.streams == 0 {
|
|
removeConn(conn)
|
|
sp.idle--
|
|
}
|
|
conn = next
|
|
continue
|
|
case connectivity.TransientFailure:
|
|
next := conn.next
|
|
if conn.streams == 0 {
|
|
removeConn(conn)
|
|
conn.ClientConn.Close()
|
|
sp.idle--
|
|
}
|
|
conn = next
|
|
continue
|
|
case connectivity.Ready:
|
|
case connectivity.Idle:
|
|
}
|
|
// a old conn
|
|
if now-conn.created > p.ttl {
|
|
next := conn.next
|
|
if conn.streams == 0 {
|
|
removeConn(conn)
|
|
conn.ClientConn.Close()
|
|
sp.idle--
|
|
}
|
|
conn = next
|
|
continue
|
|
}
|
|
// a busy conn
|
|
if conn.streams >= p.maxStreams {
|
|
next := conn.next
|
|
removeConn(conn)
|
|
addConnAfter(conn, sp.busy)
|
|
conn = next
|
|
continue
|
|
}
|
|
// a idle conn
|
|
if conn.streams == 0 {
|
|
sp.idle--
|
|
}
|
|
// a good conn
|
|
conn.streams++
|
|
p.Unlock()
|
|
return conn, nil
|
|
}
|
|
p.Unlock()
|
|
|
|
// nolint (TODO need fix) create new conn)
|
|
cc, err := grpc.DialContext(ctx, addr, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
|
|
|
// add conn to streams pool
|
|
p.Lock()
|
|
if sp.count < p.size {
|
|
addConnAfter(conn, sp.head)
|
|
}
|
|
p.Unlock()
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
|
p.Lock()
|
|
p, sp, created := conn.pool, conn.sp, conn.created
|
|
// try to add conn
|
|
if !conn.in && sp.count < p.size {
|
|
addConnAfter(conn, sp.head)
|
|
}
|
|
if !conn.in {
|
|
p.Unlock()
|
|
conn.ClientConn.Close()
|
|
return
|
|
}
|
|
// a busy conn
|
|
if conn.streams >= p.maxStreams {
|
|
removeConn(conn)
|
|
addConnAfter(conn, sp.head)
|
|
}
|
|
conn.streams--
|
|
// if streams == 0, we can do something
|
|
if conn.streams == 0 {
|
|
// 1. it has errored
|
|
// 2. too many idle conn or
|
|
// 3. conn is too old
|
|
now := time.Now().Unix()
|
|
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
|
removeConn(conn)
|
|
p.Unlock()
|
|
conn.ClientConn.Close()
|
|
return
|
|
}
|
|
sp.idle++
|
|
}
|
|
p.Unlock()
|
|
}
|
|
|
|
func (conn *PoolConn) Close() {
|
|
conn.pool.Put(conn, conn.err)
|
|
}
|
|
|
|
func removeConn(conn *PoolConn) {
|
|
if conn.pre != nil {
|
|
conn.pre.next = conn.next
|
|
}
|
|
if conn.next != nil {
|
|
conn.next.pre = conn.pre
|
|
}
|
|
conn.pre = nil
|
|
conn.next = nil
|
|
conn.in = false
|
|
conn.sp.count--
|
|
}
|
|
|
|
func addConnAfter(conn *PoolConn, after *PoolConn) {
|
|
conn.next = after.next
|
|
conn.pre = after
|
|
if after.next != nil {
|
|
after.next.pre = conn
|
|
}
|
|
after.next = conn
|
|
conn.in = true
|
|
conn.sp.count++
|
|
}
|