Move connection pool to own package

This commit is contained in:
Asim Aslam 2019-07-28 18:56:18 +01:00
parent adb6760e21
commit f2669e7b1e
6 changed files with 215 additions and 103 deletions

114
client/pool/default.go Normal file
View File

@ -0,0 +1,114 @@
package pool
import (
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/transport"
)
type pool struct {
size int
ttl time.Duration
tr transport.Transport
sync.Mutex
conns map[string][]*poolConn
}
type poolConn struct {
transport.Client
id string
created time.Time
}
func newPool(options Options) *pool {
return &pool{
size: options.Size,
tr: options.Transport,
ttl: options.TTL,
conns: make(map[string][]*poolConn),
}
}
func (p *pool) Close() error {
p.Lock()
for k, c := range p.conns {
for _, conn := range c {
conn.Client.Close()
}
delete(p.conns, k)
}
p.Unlock()
return nil
}
// NoOp the Close since we manage it
func (p *poolConn) Close() error {
return nil
}
func (p *poolConn) Id() string {
return p.id
}
func (p *poolConn) Created() time.Time {
return p.created
}
func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) {
p.Lock()
conns := p.conns[addr]
// while we have conns check age and then return one
// otherwise we'll create a new conn
for len(conns) > 0 {
conn := conns[len(conns)-1]
conns = conns[:len(conns)-1]
p.conns[addr] = conns
// if conn is old kill it and move on
if d := time.Since(conn.Created()); d > p.ttl {
conn.Client.Close()
continue
}
// we got a good conn, lets unlock and return it
p.Unlock()
return conn, nil
}
p.Unlock()
// create new conn
c, err := p.tr.Dial(addr, opts...)
if err != nil {
return nil, err
}
return &poolConn{
Client: c,
id: uuid.New().String(),
created: time.Now(),
}, nil
}
func (p *pool) Release(conn Conn, err error) error {
// don't store the conn if it has errored
if err != nil {
return conn.(*poolConn).Client.Close()
}
// otherwise put it back for reuse
p.Lock()
conns := p.conns[conn.Remote()]
if len(conns) >= p.size {
p.Unlock()
return conn.(*poolConn).Client.Close()
}
p.conns[conn.Remote()] = append(conns, conn.(*poolConn))
p.Unlock()
return nil
}

View File

@ -1,4 +1,4 @@
package client package pool
import ( import (
"testing" "testing"
@ -9,12 +9,17 @@ import (
) )
func testPool(t *testing.T, size int, ttl time.Duration) { func testPool(t *testing.T, size int, ttl time.Duration) {
// zero pool
p := newPool(size, ttl)
// mock transport // mock transport
tr := memory.NewTransport() tr := memory.NewTransport()
options := Options{
TTL: ttl,
Size: size,
Transport: tr,
}
// zero pool
p := newPool(options)
// listen // listen
l, err := tr.Listen(":0") l, err := tr.Listen(":0")
if err != nil { if err != nil {
@ -43,7 +48,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
// get a conn // get a conn
c, err := p.getConn(l.Addr(), tr) c, err := p.Get(l.Addr())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -67,7 +72,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
} }
// release the conn // release the conn
p.release(l.Addr(), c, nil) p.Release(c, nil)
p.Lock() p.Lock()
if i := len(p.conns[l.Addr()]); i > size { if i := len(p.conns[l.Addr()]); i > size {
@ -78,7 +83,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) {
} }
} }
func TestRPCPool(t *testing.T) { func TestClientPool(t *testing.T) {
testPool(t, 0, time.Minute) testPool(t, 0, time.Minute)
testPool(t, 2, time.Minute) testPool(t, 2, time.Minute)
} }

33
client/pool/options.go Normal file
View File

@ -0,0 +1,33 @@
package pool
import (
"time"
"github.com/micro/go-micro/transport"
)
type Options struct {
Transport transport.Transport
TTL time.Duration
Size int
}
type Option func(*Options)
func Size(i int) Option {
return func(o *Options) {
o.Size = i
}
}
func Transport(t transport.Transport) Option {
return func(o *Options) {
o.Transport = t
}
}
func TTL(t time.Duration) Option {
return func(o *Options) {
o.TTL = t
}
}

35
client/pool/pool.go Normal file
View File

@ -0,0 +1,35 @@
// Package pool is a connection pool
package pool
import (
"time"
"github.com/micro/go-micro/transport"
)
// Pool is an interface for connection pooling
type Pool interface {
// Close the pool
Close() error
// Get a connection
Get(addr string, opts ...transport.DialOption) (Conn, error)
// Releaes the connection
Release(c Conn, status error) error
}
type Conn interface {
// unique id of connection
Id() string
// time it was created
Created() time.Time
// embedded connection
transport.Client
}
func NewPool(opts ...Option) Pool {
var options Options
for _, o := range opts {
o(&options)
}
return newPool(options)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client/pool"
"github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
@ -22,17 +23,23 @@ import (
type rpcClient struct { type rpcClient struct {
once sync.Once once sync.Once
opts Options opts Options
pool *pool pool pool.Pool
seq uint64 seq uint64
} }
func newRpcClient(opt ...Option) Client { func newRpcClient(opt ...Option) Client {
opts := newOptions(opt...) opts := newOptions(opt...)
p := pool.NewPool(
pool.Size(opts.PoolSize),
pool.TTL(opts.PoolTTL),
pool.Transport(opts.Transport),
)
rc := &rpcClient{ rc := &rpcClient{
once: sync.Once{}, once: sync.Once{},
opts: opts, opts: opts,
pool: newPool(opts.PoolSize, opts.PoolTTL), pool: p,
seq: 0, seq: 0,
} }
@ -90,13 +97,13 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request,
} }
var grr error var grr error
c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout)) c, err := r.pool.Get(address, transport.WithTimeout(opts.DialTimeout))
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", "connection error: %v", err) return errors.InternalServerError("go.micro.client", "connection error: %v", err)
} }
defer func() { defer func() {
// defer execution of release // defer execution of release
r.pool.release(address, c, grr) r.pool.Release(c, grr)
}() }()
seq := atomic.LoadUint64(&r.seq) seq := atomic.LoadUint64(&r.seq)
@ -245,17 +252,22 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
func (r *rpcClient) Init(opts ...Option) error { func (r *rpcClient) Init(opts ...Option) error {
size := r.opts.PoolSize size := r.opts.PoolSize
ttl := r.opts.PoolTTL ttl := r.opts.PoolTTL
tr := r.opts.Transport
for _, o := range opts { for _, o := range opts {
o(&r.opts) o(&r.opts)
} }
// update pool configuration if the options changed // update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL { if size != r.opts.PoolSize || ttl != r.opts.PoolTTL || tr != r.opts.Transport {
r.pool.Lock() // close existing pool
r.pool.size = r.opts.PoolSize r.pool.Close()
r.pool.ttl = int64(r.opts.PoolTTL.Seconds()) // create new pool
r.pool.Unlock() r.pool = pool.NewPool(
pool.Size(r.opts.PoolSize),
pool.TTL(r.opts.PoolTTL),
pool.Transport(r.opts.Transport),
)
} }
return nil return nil

View File

@ -1,87 +0,0 @@
package client
import (
"sync"
"time"
"github.com/micro/go-micro/transport"
)
type pool struct {
size int
ttl int64
sync.Mutex
conns map[string][]*poolConn
}
type poolConn struct {
transport.Client
created int64
}
func newPool(size int, ttl time.Duration) *pool {
return &pool{
size: size,
ttl: int64(ttl.Seconds()),
conns: make(map[string][]*poolConn),
}
}
// NoOp the Close since we manage it
func (p *poolConn) Close() error {
return nil
}
func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) {
p.Lock()
conns := p.conns[addr]
now := time.Now().Unix()
// while we have conns check age and then return one
// otherwise we'll create a new conn
for len(conns) > 0 {
conn := conns[len(conns)-1]
conns = conns[:len(conns)-1]
p.conns[addr] = conns
// if conn is old kill it and move on
if d := now - conn.created; d > p.ttl {
conn.Client.Close()
continue
}
// we got a good conn, lets unlock and return it
p.Unlock()
return conn, nil
}
p.Unlock()
// create new conn
c, err := tr.Dial(addr, opts...)
if err != nil {
return nil, err
}
return &poolConn{c, time.Now().Unix()}, nil
}
func (p *pool) release(addr string, conn *poolConn, err error) {
// don't store the conn if it has errored
if err != nil {
conn.Client.Close()
return
}
// otherwise put it back for reuse
p.Lock()
conns := p.conns[addr]
if len(conns) >= p.size {
p.Unlock()
conn.Client.Close()
return
}
p.conns[addr] = append(conns, conn)
p.Unlock()
}