export grpc pool conn

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-02-14 19:14:56 +03:00
parent 2adba9d0da
commit 2fbcee7b59
4 changed files with 15 additions and 15 deletions

@ -21,22 +21,22 @@ type ConnPool struct {
type streamsPool struct {
// head of list
head *poolConn
head *PoolConn
// busy conns list
busy *poolConn
busy *PoolConn
// the siza of list
count int
// idle conn
idle int
}
type poolConn struct {
type PoolConn struct {
err error
*grpc.ClientConn
next *poolConn
next *PoolConn
pool *ConnPool
sp *streamsPool
pre *poolConn
pre *PoolConn
addr string
streams int
created int64
@ -59,7 +59,7 @@ func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
}
}
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) {
if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:]
}
@ -67,7 +67,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
p.Lock()
sp, ok := p.conns[addr]
if !ok {
sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0}
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
p.conns[addr] = sp
}
// while we have conns check streams and then return one
@ -135,7 +135,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
if err != nil {
return nil, err
}
conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool
p.Lock()
@ -147,7 +147,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil
}
func (p *ConnPool) Put(conn *poolConn, err error) {
func (p *ConnPool) Put(conn *PoolConn, err error) {
p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn
@ -182,11 +182,11 @@ func (p *ConnPool) Put(conn *poolConn, err error) {
p.Unlock()
}
func (conn *poolConn) Close() {
func (conn *PoolConn) Close() {
conn.pool.Put(conn, conn.err)
}
func removeConn(conn *poolConn) {
func removeConn(conn *PoolConn) {
if conn.pre != nil {
conn.pre.next = conn.next
}
@ -199,7 +199,7 @@ func removeConn(conn *poolConn) {
conn.sp.count--
}
func addConnAfter(conn *poolConn, after *poolConn) {
func addConnAfter(conn *PoolConn, after *PoolConn) {
conn.next = after.next
conn.pre = after
if after.next != nil {

@ -10,7 +10,7 @@ import (
)
var (
// DefaultPoolMaxStreams maximum streams on a connectioin
// DefaultPoolMaxStreams maximum streams on a connection
// (20)
DefaultPoolMaxStreams = 20

@ -9,7 +9,7 @@ import (
)
type response struct {
conn *poolConn
conn *PoolConn
stream grpc.ClientStream
codec codec.Codec
}

@ -17,7 +17,7 @@ type grpcStream struct {
request client.Request
response client.Response
close func(err error)
conn *poolConn
conn *PoolConn
sync.RWMutex
closed bool
}