export grpc pool conn
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
2a6a93a792
commit
19a469c4e2
24
grpc_pool.go
24
grpc_pool.go
@ -21,22 +21,22 @@ type ConnPool struct {
|
|||||||
|
|
||||||
type streamsPool struct {
|
type streamsPool struct {
|
||||||
// head of list
|
// head of list
|
||||||
head *poolConn
|
head *PoolConn
|
||||||
// busy conns list
|
// busy conns list
|
||||||
busy *poolConn
|
busy *PoolConn
|
||||||
// the siza of list
|
// the siza of list
|
||||||
count int
|
count int
|
||||||
// idle conn
|
// idle conn
|
||||||
idle int
|
idle int
|
||||||
}
|
}
|
||||||
|
|
||||||
type poolConn struct {
|
type PoolConn struct {
|
||||||
err error
|
err error
|
||||||
*grpc.ClientConn
|
*grpc.ClientConn
|
||||||
next *poolConn
|
next *PoolConn
|
||||||
pool *ConnPool
|
pool *ConnPool
|
||||||
sp *streamsPool
|
sp *streamsPool
|
||||||
pre *poolConn
|
pre *PoolConn
|
||||||
addr string
|
addr string
|
||||||
streams int
|
streams int
|
||||||
created int64
|
created int64
|
||||||
@ -59,7 +59,7 @@ func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
|
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) {
|
||||||
if strings.HasPrefix(addr, "http") {
|
if strings.HasPrefix(addr, "http") {
|
||||||
addr = addr[strings.Index(addr, ":")+3:]
|
addr = addr[strings.Index(addr, ":")+3:]
|
||||||
}
|
}
|
||||||
@ -67,7 +67,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
p.Lock()
|
p.Lock()
|
||||||
sp, ok := p.conns[addr]
|
sp, ok := p.conns[addr]
|
||||||
if !ok {
|
if !ok {
|
||||||
sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0}
|
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
||||||
p.conns[addr] = sp
|
p.conns[addr] = sp
|
||||||
}
|
}
|
||||||
// while we have conns check streams and then return one
|
// while we have conns check streams and then return one
|
||||||
@ -135,7 +135,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
||||||
|
|
||||||
// add conn to streams pool
|
// add conn to streams pool
|
||||||
p.Lock()
|
p.Lock()
|
||||||
@ -147,7 +147,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Put(conn *poolConn, err error) {
|
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
p, sp, created := conn.pool, conn.sp, conn.created
|
p, sp, created := conn.pool, conn.sp, conn.created
|
||||||
// try to add conn
|
// try to add conn
|
||||||
@ -182,11 +182,11 @@ func (p *ConnPool) Put(conn *poolConn, err error) {
|
|||||||
p.Unlock()
|
p.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *poolConn) Close() {
|
func (conn *PoolConn) Close() {
|
||||||
conn.pool.Put(conn, conn.err)
|
conn.pool.Put(conn, conn.err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeConn(conn *poolConn) {
|
func removeConn(conn *PoolConn) {
|
||||||
if conn.pre != nil {
|
if conn.pre != nil {
|
||||||
conn.pre.next = conn.next
|
conn.pre.next = conn.next
|
||||||
}
|
}
|
||||||
@ -199,7 +199,7 @@ func removeConn(conn *poolConn) {
|
|||||||
conn.sp.count--
|
conn.sp.count--
|
||||||
}
|
}
|
||||||
|
|
||||||
func addConnAfter(conn *poolConn, after *poolConn) {
|
func addConnAfter(conn *PoolConn, after *PoolConn) {
|
||||||
conn.next = after.next
|
conn.next = after.next
|
||||||
conn.pre = after
|
conn.pre = after
|
||||||
if after.next != nil {
|
if after.next != nil {
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// DefaultPoolMaxStreams maximum streams on a connectioin
|
// DefaultPoolMaxStreams maximum streams on a connection
|
||||||
// (20)
|
// (20)
|
||||||
DefaultPoolMaxStreams = 20
|
DefaultPoolMaxStreams = 20
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type response struct {
|
type response struct {
|
||||||
conn *poolConn
|
conn *PoolConn
|
||||||
stream grpc.ClientStream
|
stream grpc.ClientStream
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user