diff --git a/client/pool/default.go b/client/pool/default.go new file mode 100644 index 00000000..d625289c --- /dev/null +++ b/client/pool/default.go @@ -0,0 +1,114 @@ +package pool + +import ( + "sync" + "time" + + "github.com/google/uuid" + "github.com/micro/go-micro/transport" +) + +type pool struct { + size int + ttl time.Duration + tr transport.Transport + + sync.Mutex + conns map[string][]*poolConn +} + +type poolConn struct { + transport.Client + id string + created time.Time +} + +func newPool(options Options) *pool { + return &pool{ + size: options.Size, + tr: options.Transport, + ttl: options.TTL, + conns: make(map[string][]*poolConn), + } +} + +func (p *pool) Close() error { + p.Lock() + for k, c := range p.conns { + for _, conn := range c { + conn.Client.Close() + } + delete(p.conns, k) + } + p.Unlock() + return nil +} + +// NoOp the Close since we manage it +func (p *poolConn) Close() error { + return nil +} + +func (p *poolConn) Id() string { + return p.id +} + +func (p *poolConn) Created() time.Time { + return p.created +} + +func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) { + p.Lock() + conns := p.conns[addr] + + // while we have conns check age and then return one + // otherwise we'll create a new conn + for len(conns) > 0 { + conn := conns[len(conns)-1] + conns = conns[:len(conns)-1] + p.conns[addr] = conns + + // if conn is old kill it and move on + if d := time.Since(conn.Created()); d > p.ttl { + conn.Client.Close() + continue + } + + // we got a good conn, lets unlock and return it + p.Unlock() + + return conn, nil + } + + p.Unlock() + + // create new conn + c, err := p.tr.Dial(addr, opts...) + if err != nil { + return nil, err + } + return &poolConn{ + Client: c, + id: uuid.New().String(), + created: time.Now(), + }, nil +} + +func (p *pool) Release(conn Conn, err error) error { + // don't store the conn if it has errored + if err != nil { + return conn.(*poolConn).Client.Close() + } + + // otherwise put it back for reuse + p.Lock() + conns := p.conns[conn.Remote()] + if len(conns) >= p.size { + p.Unlock() + return conn.(*poolConn).Client.Close() + } + p.conns[conn.Remote()] = append(conns, conn.(*poolConn)) + p.Unlock() + + return nil +} diff --git a/client/rpc_pool_test.go b/client/pool/default_test.go similarity index 86% rename from client/rpc_pool_test.go rename to client/pool/default_test.go index 6c5875f1..4823e228 100644 --- a/client/rpc_pool_test.go +++ b/client/pool/default_test.go @@ -1,4 +1,4 @@ -package client +package pool import ( "testing" @@ -9,12 +9,17 @@ import ( ) func testPool(t *testing.T, size int, ttl time.Duration) { - // zero pool - p := newPool(size, ttl) - // mock transport tr := memory.NewTransport() + options := Options{ + TTL: ttl, + Size: size, + Transport: tr, + } + // zero pool + p := newPool(options) + // listen l, err := tr.Listen(":0") if err != nil { @@ -43,7 +48,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) { for i := 0; i < 10; i++ { // get a conn - c, err := p.getConn(l.Addr(), tr) + c, err := p.Get(l.Addr()) if err != nil { t.Fatal(err) } @@ -67,7 +72,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) { } // release the conn - p.release(l.Addr(), c, nil) + p.Release(c, nil) p.Lock() if i := len(p.conns[l.Addr()]); i > size { @@ -78,7 +83,7 @@ func testPool(t *testing.T, size int, ttl time.Duration) { } } -func TestRPCPool(t *testing.T) { +func TestClientPool(t *testing.T) { testPool(t, 0, time.Minute) testPool(t, 2, time.Minute) } diff --git a/client/pool/options.go b/client/pool/options.go new file mode 100644 index 00000000..ed3feda7 --- /dev/null +++ b/client/pool/options.go @@ -0,0 +1,33 @@ +package pool + +import ( + "time" + + "github.com/micro/go-micro/transport" +) + +type Options struct { + Transport transport.Transport + TTL time.Duration + Size int +} + +type Option func(*Options) + +func Size(i int) Option { + return func(o *Options) { + o.Size = i + } +} + +func Transport(t transport.Transport) Option { + return func(o *Options) { + o.Transport = t + } +} + +func TTL(t time.Duration) Option { + return func(o *Options) { + o.TTL = t + } +} diff --git a/client/pool/pool.go b/client/pool/pool.go new file mode 100644 index 00000000..79dd1a64 --- /dev/null +++ b/client/pool/pool.go @@ -0,0 +1,35 @@ +// Package pool is a connection pool +package pool + +import ( + "time" + + "github.com/micro/go-micro/transport" +) + +// Pool is an interface for connection pooling +type Pool interface { + // Close the pool + Close() error + // Get a connection + Get(addr string, opts ...transport.DialOption) (Conn, error) + // Releaes the connection + Release(c Conn, status error) error +} + +type Conn interface { + // unique id of connection + Id() string + // time it was created + Created() time.Time + // embedded connection + transport.Client +} + +func NewPool(opts ...Option) Pool { + var options Options + for _, o := range opts { + o(&options) + } + return newPool(options) +} diff --git a/client/rpc_client.go b/client/rpc_client.go index 39d044d9..724e3b89 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/client/pool" "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/errors" @@ -22,17 +23,23 @@ import ( type rpcClient struct { once sync.Once opts Options - pool *pool + pool pool.Pool seq uint64 } func newRpcClient(opt ...Option) Client { opts := newOptions(opt...) + p := pool.NewPool( + pool.Size(opts.PoolSize), + pool.TTL(opts.PoolTTL), + pool.Transport(opts.Transport), + ) + rc := &rpcClient{ once: sync.Once{}, opts: opts, - pool: newPool(opts.PoolSize, opts.PoolTTL), + pool: p, seq: 0, } @@ -90,13 +97,13 @@ func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, } var grr error - c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout)) + c, err := r.pool.Get(address, transport.WithTimeout(opts.DialTimeout)) if err != nil { return errors.InternalServerError("go.micro.client", "connection error: %v", err) } defer func() { // defer execution of release - r.pool.release(address, c, grr) + r.pool.Release(c, grr) }() seq := atomic.LoadUint64(&r.seq) @@ -245,17 +252,22 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request func (r *rpcClient) Init(opts ...Option) error { size := r.opts.PoolSize ttl := r.opts.PoolTTL + tr := r.opts.Transport for _, o := range opts { o(&r.opts) } // update pool configuration if the options changed - if size != r.opts.PoolSize || ttl != r.opts.PoolTTL { - r.pool.Lock() - r.pool.size = r.opts.PoolSize - r.pool.ttl = int64(r.opts.PoolTTL.Seconds()) - r.pool.Unlock() + if size != r.opts.PoolSize || ttl != r.opts.PoolTTL || tr != r.opts.Transport { + // close existing pool + r.pool.Close() + // create new pool + r.pool = pool.NewPool( + pool.Size(r.opts.PoolSize), + pool.TTL(r.opts.PoolTTL), + pool.Transport(r.opts.Transport), + ) } return nil diff --git a/client/rpc_pool.go b/client/rpc_pool.go deleted file mode 100644 index 9fdd7736..00000000 --- a/client/rpc_pool.go +++ /dev/null @@ -1,87 +0,0 @@ -package client - -import ( - "sync" - "time" - - "github.com/micro/go-micro/transport" -) - -type pool struct { - size int - ttl int64 - - sync.Mutex - conns map[string][]*poolConn -} - -type poolConn struct { - transport.Client - created int64 -} - -func newPool(size int, ttl time.Duration) *pool { - return &pool{ - size: size, - ttl: int64(ttl.Seconds()), - conns: make(map[string][]*poolConn), - } -} - -// NoOp the Close since we manage it -func (p *poolConn) Close() error { - return nil -} - -func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) { - p.Lock() - conns := p.conns[addr] - now := time.Now().Unix() - - // while we have conns check age and then return one - // otherwise we'll create a new conn - for len(conns) > 0 { - conn := conns[len(conns)-1] - conns = conns[:len(conns)-1] - p.conns[addr] = conns - - // if conn is old kill it and move on - if d := now - conn.created; d > p.ttl { - conn.Client.Close() - continue - } - - // we got a good conn, lets unlock and return it - p.Unlock() - - return conn, nil - } - - p.Unlock() - - // create new conn - c, err := tr.Dial(addr, opts...) - if err != nil { - return nil, err - } - return &poolConn{c, time.Now().Unix()}, nil -} - -func (p *pool) release(addr string, conn *poolConn, err error) { - // don't store the conn if it has errored - if err != nil { - conn.Client.Close() - return - } - - // otherwise put it back for reuse - p.Lock() - conns := p.conns[addr] - if len(conns) >= p.size { - p.Unlock() - conn.Client.Close() - return - } - p.conns[addr] = append(conns, conn) - p.Unlock() -}