219 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			219 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package grpc
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/connectivity"
 | 
						|
)
 | 
						|
 | 
						|
type pool struct {
 | 
						|
	size int
 | 
						|
	ttl  int64
 | 
						|
 | 
						|
	//  max streams on a *poolConn
 | 
						|
	maxStreams int
 | 
						|
	//  max idle conns
 | 
						|
	maxIdle int
 | 
						|
 | 
						|
	sync.Mutex
 | 
						|
	conns map[string]*streamsPool
 | 
						|
}
 | 
						|
 | 
						|
type streamsPool struct {
 | 
						|
	//  head of list
 | 
						|
	head *poolConn
 | 
						|
	//  busy conns list
 | 
						|
	busy *poolConn
 | 
						|
	//  the siza of list
 | 
						|
	count int
 | 
						|
	//  idle conn
 | 
						|
	idle int
 | 
						|
}
 | 
						|
 | 
						|
type poolConn struct {
 | 
						|
	//  grpc conn
 | 
						|
	*grpc.ClientConn
 | 
						|
	err  error
 | 
						|
	addr string
 | 
						|
 | 
						|
	//  pool and streams pool
 | 
						|
	pool    *pool
 | 
						|
	sp      *streamsPool
 | 
						|
	streams int
 | 
						|
	created int64
 | 
						|
 | 
						|
	//  list
 | 
						|
	pre  *poolConn
 | 
						|
	next *poolConn
 | 
						|
	in   bool
 | 
						|
}
 | 
						|
 | 
						|
func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
 | 
						|
	if ms <= 0 {
 | 
						|
		ms = 1
 | 
						|
	}
 | 
						|
	if idle < 0 {
 | 
						|
		idle = 0
 | 
						|
	}
 | 
						|
	return &pool{
 | 
						|
		size:       size,
 | 
						|
		ttl:        int64(ttl.Seconds()),
 | 
						|
		maxStreams: ms,
 | 
						|
		maxIdle:    idle,
 | 
						|
		conns:      make(map[string]*streamsPool),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) {
 | 
						|
	now := time.Now().Unix()
 | 
						|
	p.Lock()
 | 
						|
	sp, ok := p.conns[addr]
 | 
						|
	if !ok {
 | 
						|
		sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0}
 | 
						|
		p.conns[addr] = sp
 | 
						|
	}
 | 
						|
	//  while we have conns check streams and then return one
 | 
						|
	//  otherwise we'll create a new conn
 | 
						|
	conn := sp.head.next
 | 
						|
	for conn != nil {
 | 
						|
		//  check conn state
 | 
						|
		// https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
 | 
						|
		switch conn.GetState() {
 | 
						|
		case connectivity.Connecting:
 | 
						|
			conn = conn.next
 | 
						|
			continue
 | 
						|
		case connectivity.Shutdown:
 | 
						|
			next := conn.next
 | 
						|
			if conn.streams == 0 {
 | 
						|
				removeConn(conn)
 | 
						|
				sp.idle--
 | 
						|
			}
 | 
						|
			conn = next
 | 
						|
			continue
 | 
						|
		case connectivity.TransientFailure:
 | 
						|
			next := conn.next
 | 
						|
			if conn.streams == 0 {
 | 
						|
				removeConn(conn)
 | 
						|
				conn.ClientConn.Close()
 | 
						|
				sp.idle--
 | 
						|
			}
 | 
						|
			conn = next
 | 
						|
			continue
 | 
						|
		case connectivity.Ready:
 | 
						|
		case connectivity.Idle:
 | 
						|
		}
 | 
						|
		//  a old conn
 | 
						|
		if now-conn.created > p.ttl {
 | 
						|
			next := conn.next
 | 
						|
			if conn.streams == 0 {
 | 
						|
				removeConn(conn)
 | 
						|
				conn.ClientConn.Close()
 | 
						|
				sp.idle--
 | 
						|
			}
 | 
						|
			conn = next
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		//  a busy conn
 | 
						|
		if conn.streams >= p.maxStreams {
 | 
						|
			next := conn.next
 | 
						|
			removeConn(conn)
 | 
						|
			addConnAfter(conn, sp.busy)
 | 
						|
			conn = next
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		//  a idle conn
 | 
						|
		if conn.streams == 0 {
 | 
						|
			sp.idle--
 | 
						|
		}
 | 
						|
		//  a good conn
 | 
						|
		conn.streams++
 | 
						|
		p.Unlock()
 | 
						|
		return conn, nil
 | 
						|
	}
 | 
						|
	p.Unlock()
 | 
						|
 | 
						|
	//  create new conn
 | 
						|
	cc, err := grpc.Dial(addr, opts...)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	conn = &poolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false}
 | 
						|
 | 
						|
	//  add conn to streams pool
 | 
						|
	p.Lock()
 | 
						|
	if sp.count < p.size {
 | 
						|
		addConnAfter(conn, sp.head)
 | 
						|
	}
 | 
						|
	p.Unlock()
 | 
						|
 | 
						|
	return conn, nil
 | 
						|
}
 | 
						|
 | 
						|
func (p *pool) release(addr string, conn *poolConn, err error) {
 | 
						|
	p.Lock()
 | 
						|
	p, sp, created := conn.pool, conn.sp, conn.created
 | 
						|
	//  try to add conn
 | 
						|
	if !conn.in && sp.count < p.size {
 | 
						|
		addConnAfter(conn, sp.head)
 | 
						|
	}
 | 
						|
	if !conn.in {
 | 
						|
		p.Unlock()
 | 
						|
		conn.ClientConn.Close()
 | 
						|
		return
 | 
						|
	}
 | 
						|
	//  a busy conn
 | 
						|
	if conn.streams >= p.maxStreams {
 | 
						|
		removeConn(conn)
 | 
						|
		addConnAfter(conn, sp.head)
 | 
						|
	}
 | 
						|
	conn.streams--
 | 
						|
	//  if streams == 0, we can do something
 | 
						|
	if conn.streams == 0 {
 | 
						|
		//  1. it has errored
 | 
						|
		//  2. too many idle conn or
 | 
						|
		//  3. conn is too old
 | 
						|
		now := time.Now().Unix()
 | 
						|
		if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
 | 
						|
			removeConn(conn)
 | 
						|
			p.Unlock()
 | 
						|
			conn.ClientConn.Close()
 | 
						|
			return
 | 
						|
		}
 | 
						|
		sp.idle++
 | 
						|
	}
 | 
						|
	p.Unlock()
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func (conn *poolConn) Close() {
 | 
						|
	conn.pool.release(conn.addr, conn, conn.err)
 | 
						|
}
 | 
						|
 | 
						|
func removeConn(conn *poolConn) {
 | 
						|
	if conn.pre != nil {
 | 
						|
		conn.pre.next = conn.next
 | 
						|
	}
 | 
						|
	if conn.next != nil {
 | 
						|
		conn.next.pre = conn.pre
 | 
						|
	}
 | 
						|
	conn.pre = nil
 | 
						|
	conn.next = nil
 | 
						|
	conn.in = false
 | 
						|
	conn.sp.count--
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func addConnAfter(conn *poolConn, after *poolConn) {
 | 
						|
	conn.next = after.next
 | 
						|
	conn.pre = after
 | 
						|
	if after.next != nil {
 | 
						|
		after.next.pre = conn
 | 
						|
	}
 | 
						|
	after.next = conn
 | 
						|
	conn.in = true
 | 
						|
	conn.sp.count++
 | 
						|
	return
 | 
						|
}
 |