Make pool configurable
This commit is contained in:
		| @@ -66,14 +66,16 @@ type RequestOption func(*RequestOptions) | |||||||
| var ( | var ( | ||||||
| 	// DefaultClient is a default client to use out of the box | 	// DefaultClient is a default client to use out of the box | ||||||
| 	DefaultClient Client = newRpcClient() | 	DefaultClient Client = newRpcClient() | ||||||
|  |  | ||||||
| 	// DefaultBackoff is the default backoff function for retries | 	// DefaultBackoff is the default backoff function for retries | ||||||
| 	DefaultBackoff = exponentialBackoff | 	DefaultBackoff = exponentialBackoff | ||||||
|  |  | ||||||
| 	// DefaultRetries is the default number of times a request is tried | 	// DefaultRetries is the default number of times a request is tried | ||||||
| 	DefaultRetries = 1 | 	DefaultRetries = 1 | ||||||
| 	// DefaultRequestTimeout is the default request timeout | 	// DefaultRequestTimeout is the default request timeout | ||||||
| 	DefaultRequestTimeout = time.Second * 5 | 	DefaultRequestTimeout = time.Second * 5 | ||||||
|  | 	// DefaultPoolSize sets the connection pool size | ||||||
|  | 	DefaultPoolSize = 0 | ||||||
|  | 	// DefaultPoolTTL sets the connection pool ttl | ||||||
|  | 	DefaultPoolTTL = time.Minute | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Makes a synchronous call to a service using the default client | // Makes a synchronous call to a service using the default client | ||||||
|   | |||||||
| @@ -23,6 +23,10 @@ type Options struct { | |||||||
| 	Selector  selector.Selector | 	Selector  selector.Selector | ||||||
| 	Transport transport.Transport | 	Transport transport.Transport | ||||||
|  |  | ||||||
|  | 	// Connection Pool | ||||||
|  | 	PoolSize int | ||||||
|  | 	PoolTTL  time.Duration | ||||||
|  |  | ||||||
| 	// Middleware for client | 	// Middleware for client | ||||||
| 	Wrappers []Wrapper | 	Wrappers []Wrapper | ||||||
|  |  | ||||||
| @@ -74,6 +78,8 @@ func newOptions(options ...Option) Options { | |||||||
| 			RequestTimeout: DefaultRequestTimeout, | 			RequestTimeout: DefaultRequestTimeout, | ||||||
| 			DialTimeout:    transport.DefaultDialTimeout, | 			DialTimeout:    transport.DefaultDialTimeout, | ||||||
| 		}, | 		}, | ||||||
|  | 		PoolSize: DefaultPoolSize, | ||||||
|  | 		PoolTTL:  DefaultPoolTTL, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, o := range options { | 	for _, o := range options { | ||||||
| @@ -126,6 +132,20 @@ func ContentType(ct string) Option { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // PoolSize sets the connection pool size | ||||||
|  | func PoolSize(d int) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.PoolSize = d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // PoolSize sets the connection pool size | ||||||
|  | func PoolTTL(d time.Duration) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.PoolTTL = d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // Registry to find nodes for a given service | // Registry to find nodes for a given service | ||||||
| func Registry(r registry.Registry) Option { | func Registry(r registry.Registry) Option { | ||||||
| 	return func(o *Options) { | 	return func(o *Options) { | ||||||
|   | |||||||
| @@ -28,7 +28,7 @@ func newRpcClient(opt ...Option) Client { | |||||||
| 	rc := &rpcClient{ | 	rc := &rpcClient{ | ||||||
| 		once: sync.Once{}, | 		once: sync.Once{}, | ||||||
| 		opts: opts, | 		opts: opts, | ||||||
| 		pool: newPool(), | 		pool: newPool(opts.PoolSize, opts.PoolTTL), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	c := Client(rc) | 	c := Client(rc) | ||||||
| @@ -178,9 +178,18 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt | |||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcClient) Init(opts ...Option) error { | func (r *rpcClient) Init(opts ...Option) error { | ||||||
|  | 	size := r.opts.PoolSize | ||||||
|  | 	ttl := r.opts.PoolTTL | ||||||
|  |  | ||||||
| 	for _, o := range opts { | 	for _, o := range opts { | ||||||
| 		o(&r.opts) | 		o(&r.opts) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// recreate the pool if the options changed | ||||||
|  | 	if size != r.opts.PoolSize || ttl != r.opts.PoolTTL { | ||||||
|  | 		r.pool = newPool(r.opts.PoolSize, r.opts.PoolTTL) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,6 +1,7 @@ | |||||||
| package client | package client | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"fmt" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| @@ -8,7 +9,8 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| type pool struct { | type pool struct { | ||||||
| 	tr transport.Transport | 	size int | ||||||
|  | 	ttl  int64 | ||||||
|  |  | ||||||
| 	sync.Mutex | 	sync.Mutex | ||||||
| 	conns map[string][]*poolConn | 	conns map[string][]*poolConn | ||||||
| @@ -19,15 +21,10 @@ type poolConn struct { | |||||||
| 	created int64 | 	created int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| var ( | func newPool(size int, ttl time.Duration) *pool { | ||||||
| 	// only hold on to this many conns |  | ||||||
| 	maxIdleConn = 2 |  | ||||||
| 	// only hold on to the conn for this period |  | ||||||
| 	maxLifeTime = int64(60) |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| func newPool() *pool { |  | ||||||
| 	return &pool{ | 	return &pool{ | ||||||
|  | 		size:  size, | ||||||
|  | 		ttl:   int64(ttl.Seconds()), | ||||||
| 		conns: make(map[string][]*poolConn), | 		conns: make(map[string][]*poolConn), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -50,7 +47,7 @@ func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.Di | |||||||
| 		p.conns[addr] = conns | 		p.conns[addr] = conns | ||||||
|  |  | ||||||
| 		// if conn is old kill it and move on | 		// if conn is old kill it and move on | ||||||
| 		if d := now - conn.created; d > maxLifeTime { | 		if d := now - conn.created; d > p.ttl { | ||||||
| 			conn.Client.Close() | 			conn.Client.Close() | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| @@ -58,11 +55,14 @@ func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.Di | |||||||
| 		// we got a good conn, lets unlock and return it | 		// we got a good conn, lets unlock and return it | ||||||
| 		p.Unlock() | 		p.Unlock() | ||||||
|  |  | ||||||
|  | 		fmt.Println("old conn") | ||||||
| 		return conn, nil | 		return conn, nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	p.Unlock() | 	p.Unlock() | ||||||
|  |  | ||||||
|  | 	fmt.Println("new conn") | ||||||
|  |  | ||||||
| 	// create new conn | 	// create new conn | ||||||
| 	c, err := tr.Dial(addr, opts...) | 	c, err := tr.Dial(addr, opts...) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -81,7 +81,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) { | |||||||
| 	// otherwise put it back for reuse | 	// otherwise put it back for reuse | ||||||
| 	p.Lock() | 	p.Lock() | ||||||
| 	conns := p.conns[addr] | 	conns := p.conns[addr] | ||||||
| 	if len(conns) >= maxIdleConn { | 	if len(conns) >= p.size { | ||||||
| 		p.Unlock() | 		p.Unlock() | ||||||
| 		conn.Client.Close() | 		conn.Client.Close() | ||||||
| 		return | 		return | ||||||
|   | |||||||
							
								
								
									
										22
									
								
								cmd/cmd.go
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								cmd/cmd.go
									
									
									
									
									
								
							| @@ -62,6 +62,16 @@ var ( | |||||||
| 			EnvVar: "MICRO_CLIENT_RETRIES", | 			EnvVar: "MICRO_CLIENT_RETRIES", | ||||||
| 			Usage:  "Sets the client retries. Default: 1", | 			Usage:  "Sets the client retries. Default: 1", | ||||||
| 		}, | 		}, | ||||||
|  | 		cli.IntFlag{ | ||||||
|  | 			Name:   "client_pool_size", | ||||||
|  | 			EnvVar: "MICRO_CLIENT_POOL_SIZE", | ||||||
|  | 			Usage:  "Sets the client connection pool size. Default: 0", | ||||||
|  | 		}, | ||||||
|  | 		cli.StringFlag{ | ||||||
|  | 			Name:   "client_pool_ttl", | ||||||
|  | 			EnvVar: "MICRO_CLIENT_POOL_TTL", | ||||||
|  | 			Usage:  "Sets the client connection pool ttl. e.g 500ms, 5s, 1m. Default: 1m", | ||||||
|  | 		}, | ||||||
| 		cli.StringFlag{ | 		cli.StringFlag{ | ||||||
| 			Name:   "server_name", | 			Name:   "server_name", | ||||||
| 			EnvVar: "MICRO_SERVER_NAME", | 			EnvVar: "MICRO_SERVER_NAME", | ||||||
| @@ -337,6 +347,18 @@ func (c *cmd) Before(ctx *cli.Context) error { | |||||||
| 		clientOpts = append(clientOpts, client.RequestTimeout(d)) | 		clientOpts = append(clientOpts, client.RequestTimeout(d)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if r := ctx.Int("client_pool_size"); r > 0 { | ||||||
|  | 		clientOpts = append(clientOpts, client.PoolSize(r)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if t := ctx.String("client_pool_ttl"); len(t) > 0 { | ||||||
|  | 		d, err := time.ParseDuration(t) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("failed to parse client_pool_ttl: %v", t) | ||||||
|  | 		} | ||||||
|  | 		clientOpts = append(clientOpts, client.PoolTTL(d)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// We have some command line opts for the server. | 	// We have some command line opts for the server. | ||||||
| 	// Lets set it up | 	// Lets set it up | ||||||
| 	if len(serverOpts) > 0 { | 	if len(serverOpts) > 0 { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user