diff --git a/grpc.go b/grpc.go index b06b836..d9ffe87 100644 --- a/grpc.go +++ b/grpc.go @@ -30,7 +30,7 @@ const ( ) type grpcClient struct { - pool *pool + pool *ConnPool opts client.Options sync.RWMutex init bool @@ -130,13 +130,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) } - cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) + cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } defer func() { // defer execution of release - g.pool.release(cc, grr) + g.pool.Put(cc, grr) }() ch := make(chan error, 1) @@ -239,7 +239,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) } - cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) + cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -272,7 +272,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request // cancel the context cancel() // release the connection - g.pool.release(cc, err) + g.pool.Put(cc, err) // now return the error return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } @@ -300,7 +300,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request } // defer execution of release - g.pool.release(cc, err) + g.pool.Put(cc, err) }, } @@ -825,7 +825,7 @@ func NewClient(opts ...client.Option) client.Client { opts: options, } - rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) + rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c := client.Client(rc) diff --git a/grpc_pool.go b/grpc_pool.go index f8da979..c31deaf 100644 --- a/grpc_pool.go +++ b/grpc_pool.go @@ -10,7 +10,7 @@ import ( "google.golang.org/grpc/connectivity" ) -type pool struct { +type ConnPool struct { conns map[string]*streamsPool size int ttl int64 @@ -34,7 +34,7 @@ type poolConn struct { err error *grpc.ClientConn next *poolConn - pool *pool + pool *ConnPool sp *streamsPool pre *poolConn addr string @@ -43,14 +43,14 @@ type poolConn struct { in bool } -func newPool(size int, ttl time.Duration, idle int, ms int) *pool { +func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool { if ms <= 0 { ms = 1 } if idle < 0 { idle = 0 } - return &pool{ + return &ConnPool{ size: size, ttl: int64(ttl.Seconds()), maxStreams: ms, @@ -59,7 +59,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool { } } -func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { +func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { if strings.HasPrefix(addr, "http") { addr = addr[strings.Index(addr, ":")+3:] } @@ -147,7 +147,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption return conn, nil } -func (p *pool) release(conn *poolConn, err error) { +func (p *ConnPool) Put(conn *poolConn, err error) { p.Lock() p, sp, created := conn.pool, conn.sp, conn.created // try to add conn @@ -183,7 +183,7 @@ func (p *pool) release(conn *poolConn, err error) { } func (conn *poolConn) Close() { - conn.pool.release(conn, conn.err) + conn.pool.Put(conn, conn.err) } func removeConn(conn *poolConn) {