From 2a6a93a7922a893150d0bae0c7fc1a00dd80d49e Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 14 Feb 2024 18:09:07 +0300 Subject: [PATCH 1/3] export grpc conn pool Signed-off-by: Vasiliy Tolstov --- grpc.go | 14 +++++++------- grpc_pool.go | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/grpc.go b/grpc.go index b06b836..d9ffe87 100644 --- a/grpc.go +++ b/grpc.go @@ -30,7 +30,7 @@ const ( ) type grpcClient struct { - pool *pool + pool *ConnPool opts client.Options sync.RWMutex init bool @@ -130,13 +130,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) } - cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) + cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } defer func() { // defer execution of release - g.pool.release(cc, grr) + g.pool.Put(cc, grr) }() ch := make(chan error, 1) @@ -239,7 +239,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request grpcDialOptions = append(grpcDialOptions, grpc.WithContextDialer(contextDialer)) } - cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) + cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -272,7 +272,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request // cancel the context cancel() // release the connection - g.pool.release(cc, err) + g.pool.Put(cc, err) // now return the error return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } @@ -300,7 +300,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request } // defer execution of release - g.pool.release(cc, err) + g.pool.Put(cc, err) }, } @@ -825,7 +825,7 @@ func NewClient(opts ...client.Option) client.Client { opts: options, } - rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) + rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c := client.Client(rc) diff --git a/grpc_pool.go b/grpc_pool.go index f8da979..c31deaf 100644 --- a/grpc_pool.go +++ b/grpc_pool.go @@ -10,7 +10,7 @@ import ( "google.golang.org/grpc/connectivity" ) -type pool struct { +type ConnPool struct { conns map[string]*streamsPool size int ttl int64 @@ -34,7 +34,7 @@ type poolConn struct { err error *grpc.ClientConn next *poolConn - pool *pool + pool *ConnPool sp *streamsPool pre *poolConn addr string @@ -43,14 +43,14 @@ type poolConn struct { in bool } -func newPool(size int, ttl time.Duration, idle int, ms int) *pool { +func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool { if ms <= 0 { ms = 1 } if idle < 0 { idle = 0 } - return &pool{ + return &ConnPool{ size: size, ttl: int64(ttl.Seconds()), maxStreams: ms, @@ -59,7 +59,7 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool { } } -func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { +func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { if strings.HasPrefix(addr, "http") { addr = addr[strings.Index(addr, ":")+3:] } @@ -147,7 +147,7 @@ func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption return conn, nil } -func (p *pool) release(conn *poolConn, err error) { +func (p *ConnPool) Put(conn *poolConn, err error) { p.Lock() p, sp, created := conn.pool, conn.sp, conn.created // try to add conn @@ -183,7 +183,7 @@ func (p *pool) release(conn *poolConn, err error) { } func (conn *poolConn) Close() { - conn.pool.release(conn, conn.err) + conn.pool.Put(conn, conn.err) } func removeConn(conn *poolConn) { -- 2.45.2 From 19a469c4e2f08ab59ead13023e99144c137604bb Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 14 Feb 2024 19:14:56 +0300 Subject: [PATCH 2/3] export grpc pool conn Signed-off-by: Vasiliy Tolstov --- grpc_pool.go | 24 ++++++++++++------------ options.go | 2 +- response.go | 2 +- stream.go | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/grpc_pool.go b/grpc_pool.go index c31deaf..f602710 100644 --- a/grpc_pool.go +++ b/grpc_pool.go @@ -21,22 +21,22 @@ type ConnPool struct { type streamsPool struct { // head of list - head *poolConn + head *PoolConn // busy conns list - busy *poolConn + busy *PoolConn // the siza of list count int // idle conn idle int } -type poolConn struct { +type PoolConn struct { err error *grpc.ClientConn - next *poolConn + next *PoolConn pool *ConnPool sp *streamsPool - pre *poolConn + pre *PoolConn addr string streams int created int64 @@ -59,7 +59,7 @@ func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool { } } -func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { +func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) { if strings.HasPrefix(addr, "http") { addr = addr[strings.Index(addr, ":")+3:] } @@ -67,7 +67,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption p.Lock() sp, ok := p.conns[addr] if !ok { - sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0} + sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0} p.conns[addr] = sp } // while we have conns check streams and then return one @@ -135,7 +135,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption if err != nil { return nil, err } - conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} + conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} // add conn to streams pool p.Lock() @@ -147,7 +147,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption return conn, nil } -func (p *ConnPool) Put(conn *poolConn, err error) { +func (p *ConnPool) Put(conn *PoolConn, err error) { p.Lock() p, sp, created := conn.pool, conn.sp, conn.created // try to add conn @@ -182,11 +182,11 @@ func (p *ConnPool) Put(conn *poolConn, err error) { p.Unlock() } -func (conn *poolConn) Close() { +func (conn *PoolConn) Close() { conn.pool.Put(conn, conn.err) } -func removeConn(conn *poolConn) { +func removeConn(conn *PoolConn) { if conn.pre != nil { conn.pre.next = conn.next } @@ -199,7 +199,7 @@ func removeConn(conn *poolConn) { conn.sp.count-- } -func addConnAfter(conn *poolConn, after *poolConn) { +func addConnAfter(conn *PoolConn, after *PoolConn) { conn.next = after.next conn.pre = after if after.next != nil { diff --git a/options.go b/options.go index 30fc4a6..65bc8aa 100644 --- a/options.go +++ b/options.go @@ -10,7 +10,7 @@ import ( ) var ( - // DefaultPoolMaxStreams maximum streams on a connectioin + // DefaultPoolMaxStreams maximum streams on a connection // (20) DefaultPoolMaxStreams = 20 diff --git a/response.go b/response.go index 5fbda87..e0b7b27 100644 --- a/response.go +++ b/response.go @@ -9,7 +9,7 @@ import ( ) type response struct { - conn *poolConn + conn *PoolConn stream grpc.ClientStream codec codec.Codec } diff --git a/stream.go b/stream.go index 322e397..8108e17 100644 --- a/stream.go +++ b/stream.go @@ -17,7 +17,7 @@ type grpcStream struct { request client.Request response client.Response close func(err error) - conn *poolConn + conn *PoolConn sync.RWMutex closed bool } -- 2.45.2 From 0f8ead6acc2d36e9a5059b6765b086814069dd8a Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 29 Feb 2024 16:57:38 +0300 Subject: [PATCH 3/3] add defaultConfigService --- .gitignore | 24 ++++++++++++++++++++++++ grpc.go | 16 +++++++++++++++- options.go | 14 ++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9e16696 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +bin + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +# General +.DS_Store +.idea +.vscode \ No newline at end of file diff --git a/grpc.go b/grpc.go index d9ffe87..8d6733e 100644 --- a/grpc.go +++ b/grpc.go @@ -98,6 +98,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, maxRecvMsgSize := g.maxRecvMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue() + cfgService := g.serviceConfig() var grr error @@ -116,6 +117,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), ), + grpc.WithDefaultServiceConfig(cfgService), } if opts := g.getGrpcDialOptions(opts.Context); opts != nil { @@ -218,6 +220,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request maxRecvMsgSize := g.maxRecvMsgSizeValue() maxSendMsgSize := g.maxSendMsgSizeValue() + cfgService := g.serviceConfig() grpcDialOptions := []grpc.DialOption{ g.secure(addr), @@ -225,6 +228,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize), ), + grpc.WithDefaultServiceConfig(cfgService), } if opts := g.getGrpcDialOptions(opts.Context); opts != nil { @@ -369,6 +373,17 @@ func (g *grpcClient) newCodec(ct string) (codec.Codec, error) { return nil, codec.ErrUnknownContentType } +func (g *grpcClient) serviceConfig() string { + if g.opts.Context == nil { + return DefaultServiceConfig + } + v := g.opts.Context.Value(serviceConfigKey{}) + if v == nil { + return DefaultServiceConfig + } + return v.(string) +} + func (g *grpcClient) Init(opts ...client.Option) error { if len(opts) == 0 && g.init { return nil @@ -826,7 +841,6 @@ func NewClient(opts ...client.Option) client.Client { } rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) - c := client.Client(rc) // wrap in reverse diff --git a/options.go b/options.go index 65bc8aa..99e75ea 100644 --- a/options.go +++ b/options.go @@ -25,6 +25,9 @@ var ( // DefaultMaxSendMsgSize maximum message that client can send // (4 MB). DefaultMaxSendMsgSize = 1024 * 1024 * 4 + + // DefaultServiceConfig enable load balancing + DefaultServiceConfig = `{"loadBalancingPolicy":"round_robin"}` ) type poolMaxStreams struct{} @@ -115,3 +118,14 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption { o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts) } } + +type serviceConfigKey struct{} + +func ServiceConfig(str string) client.CallOption { + return func(options *client.CallOptions) { + if options.Context == nil { + options.Context = context.Background() + } + options.Context = context.WithValue(options.Context, serviceConfigKey{}, str) + } +} -- 2.45.2