Compare commits

...

2 Commits

Author SHA1 Message Date
2fbcee7b59 export grpc pool conn
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 19:23:01 +03:00
2adba9d0da export grpc conn pool
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 18:09:07 +03:00
5 changed files with 27 additions and 27 deletions

14
grpc.go
View File

@@ -30,7 +30,7 @@ const (
) )
type grpcClient struct { type grpcClient struct {
pool *pool pool *ConnPool
opts client.Options opts client.Options
sync.RWMutex sync.RWMutex
init bool init bool
@@ -133,13 +133,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
} }
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
g.pool.release(cc, grr) g.pool.Put(cc, grr)
}() }()
ch := make(chan error, 1) ch := make(chan error, 1)
@@ -242,7 +242,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
} }
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@@ -275,7 +275,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context // cancel the context
cancel() cancel()
// release the connection // release the connection
g.pool.release(cc, err) g.pool.Put(cc, err)
// now return the error // now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@@ -303,7 +303,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
} }
// defer execution of release // defer execution of release
g.pool.release(cc, err) g.pool.Put(cc, err)
}, },
} }
@@ -833,7 +833,7 @@ func NewClient(opts ...client.Option) client.Client {
opts: options, opts: options,
} }
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c := client.Client(rc) c := client.Client(rc)

View File

@@ -10,7 +10,7 @@ import (
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
) )
type pool struct { type ConnPool struct {
conns map[string]*streamsPool conns map[string]*streamsPool
size int size int
ttl int64 ttl int64
@@ -21,36 +21,36 @@ type pool struct {
type streamsPool struct { type streamsPool struct {
// head of list // head of list
head *poolConn head *PoolConn
// busy conns list // busy conns list
busy *poolConn busy *PoolConn
// the siza of list // the siza of list
count int count int
// idle conn // idle conn
idle int idle int
} }
type poolConn struct { type PoolConn struct {
err error err error
*grpc.ClientConn *grpc.ClientConn
next *poolConn next *PoolConn
pool *pool pool *ConnPool
sp *streamsPool sp *streamsPool
pre *poolConn pre *PoolConn
addr string addr string
streams int streams int
created int64 created int64
in bool in bool
} }
func newPool(size int, ttl time.Duration, idle int, ms int) *pool { func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
if ms <= 0 { if ms <= 0 {
ms = 1 ms = 1
} }
if idle < 0 { if idle < 0 {
idle = 0 idle = 0
} }
return &pool{ return &ConnPool{
size: size, size: size,
ttl: int64(ttl.Seconds()), ttl: int64(ttl.Seconds()),
maxStreams: ms, maxStreams: ms,
@@ -59,7 +59,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
} }
} }
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) {
if strings.HasPrefix(addr, "http") { if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:] addr = addr[strings.Index(addr, ":")+3:]
} }
@@ -67,7 +67,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
p.Lock() p.Lock()
sp, ok := p.conns[addr] sp, ok := p.conns[addr]
if !ok { if !ok {
sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0} sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
p.conns[addr] = sp p.conns[addr] = sp
} }
// while we have conns check streams and then return one // while we have conns check streams and then return one
@@ -135,7 +135,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool // add conn to streams pool
p.Lock() p.Lock()
@@ -147,7 +147,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil return conn, nil
} }
func (p *pool) release(conn *poolConn, err error) { func (p *ConnPool) Put(conn *PoolConn, err error) {
p.Lock() p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn // try to add conn
@@ -182,11 +182,11 @@ func (p *pool) release(conn *poolConn, err error) {
p.Unlock() p.Unlock()
} }
func (conn *poolConn) Close() { func (conn *PoolConn) Close() {
conn.pool.release(conn, conn.err) conn.pool.Put(conn, conn.err)
} }
func removeConn(conn *poolConn) { func removeConn(conn *PoolConn) {
if conn.pre != nil { if conn.pre != nil {
conn.pre.next = conn.next conn.pre.next = conn.next
} }
@@ -199,7 +199,7 @@ func removeConn(conn *poolConn) {
conn.sp.count-- conn.sp.count--
} }
func addConnAfter(conn *poolConn, after *poolConn) { func addConnAfter(conn *PoolConn, after *PoolConn) {
conn.next = after.next conn.next = after.next
conn.pre = after conn.pre = after
if after.next != nil { if after.next != nil {

View File

@@ -10,7 +10,7 @@ import (
) )
var ( var (
// DefaultPoolMaxStreams maximum streams on a connectioin // DefaultPoolMaxStreams maximum streams on a connection
// (20) // (20)
DefaultPoolMaxStreams = 20 DefaultPoolMaxStreams = 20

View File

@@ -9,7 +9,7 @@ import (
) )
type response struct { type response struct {
conn *poolConn conn *PoolConn
stream grpc.ClientStream stream grpc.ClientStream
codec codec.Codec codec codec.Codec
} }

View File

@@ -17,7 +17,7 @@ type grpcStream struct {
request client.Request request client.Request
response client.Response response client.Response
close func(err error) close func(err error)
conn *poolConn conn *PoolConn
sync.RWMutex sync.RWMutex
closed bool closed bool
} }