issue_132 #134

Merged
vtolstov merged 4 commits from devstigneev/micro-client-grpc:issue_132 into v3 2024-02-29 23:07:34 +03:00
2 changed files with 14 additions and 14 deletions
Showing only changes of commit 2a6a93a792 - Show all commits

14
grpc.go
View File

@ -30,7 +30,7 @@ const (
)
type grpcClient struct {
pool *pool
pool *ConnPool
opts client.Options
sync.RWMutex
init bool
@ -130,13 +130,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
}
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
defer func() {
// defer execution of release
g.pool.release(cc, grr)
g.pool.Put(cc, grr)
}()
ch := make(chan error, 1)
@ -239,7 +239,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer))
}
cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...)
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
}
@ -272,7 +272,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// cancel the context
cancel()
// release the connection
g.pool.release(cc, err)
g.pool.Put(cc, err)
// now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
}
@ -300,7 +300,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
}
// defer execution of release
g.pool.release(cc, err)
g.pool.Put(cc, err)
},
}
@ -825,7 +825,7 @@ func NewClient(opts ...client.Option) client.Client {
opts: options,
}
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c := client.Client(rc)

View File

@ -10,7 +10,7 @@ import (
"google.golang.org/grpc/connectivity"
)
type pool struct {
type ConnPool struct {
conns map[string]*streamsPool
size int
ttl int64
@ -34,7 +34,7 @@ type poolConn struct {
err error
*grpc.ClientConn
next *poolConn
pool *pool
pool *ConnPool
sp *streamsPool
pre *poolConn
addr string
@ -43,14 +43,14 @@ type poolConn struct {
in bool
}
func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
if ms <= 0 {
ms = 1
}
if idle < 0 {
idle = 0
}
return &pool{
return &ConnPool{
size: size,
ttl: int64(ttl.Seconds()),
maxStreams: ms,
@ -59,7 +59,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool {
}
}
func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:]
}
@ -147,7 +147,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil
}
func (p *pool) release(conn *poolConn, err error) {
func (p *ConnPool) Put(conn *poolConn, err error) {
p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn
@ -183,7 +183,7 @@ func (p *pool) release(conn *poolConn, err error) {
}
func (conn *poolConn) Close() {
conn.pool.release(conn, conn.err)
conn.pool.Put(conn, conn.err)
}
func removeConn(conn *poolConn) {