From 7d5e31cd581c67bdaad9c5ba65fa022b3724048f Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 29 Dec 2019 21:07:55 +0000 Subject: [PATCH] Moving to gRPC by default (#1069) * Step 1 * Fix the test panics --- grpc.go | 36 ++++-------------------------------- grpc_pool.go | 48 ++++++++++++++++++++++++------------------------ options.go | 4 ++-- 3 files changed, 30 insertions(+), 58 deletions(-) diff --git a/grpc.go b/grpc.go index 792563d..e8e23a3 100644 --- a/grpc.go +++ b/grpc.go @@ -12,12 +12,10 @@ import ( "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" "github.com/micro/go-micro/client/selector" - "github.com/micro/go-micro/codec" raw "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/transport" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -623,45 +621,19 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption { } func newClient(opts ...client.Option) client.Client { - options := client.Options{ - Codecs: make(map[string]codec.NewCodec), - CallOptions: client.CallOptions{ - Backoff: client.DefaultBackoff, - Retry: client.DefaultRetry, - Retries: client.DefaultRetries, - RequestTimeout: client.DefaultRequestTimeout, - DialTimeout: transport.DefaultDialTimeout, - }, - PoolSize: client.DefaultPoolSize, - PoolTTL: client.DefaultPoolTTL, - } + options := client.NewOptions() + // default content type for grpc + options.ContentType = "application/grpc+proto" for _, o := range opts { o(&options) } - if len(options.ContentType) == 0 { - options.ContentType = "application/grpc+proto" - } - - if options.Broker == nil { - options.Broker = broker.DefaultBroker - } - - if options.Registry == nil { - options.Registry = registry.DefaultRegistry - } - - if options.Selector == nil { - options.Selector = selector.NewSelector( - selector.Registry(options.Registry), - ) - } - rc := &grpcClient{ once: sync.Once{}, opts: options, } + rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams()) c := client.Client(rc) diff --git a/grpc_pool.go b/grpc_pool.go index ee5bb87..340aecc 100644 --- a/grpc_pool.go +++ b/grpc_pool.go @@ -14,7 +14,7 @@ type pool struct { // max streams on a *poolConn maxStreams int // max idle conns - maxIdle int + maxIdle int sync.Mutex conns map[string]*streamsPool @@ -22,20 +22,20 @@ type pool struct { type streamsPool struct { // head of list - head *poolConn + head *poolConn // busy conns list - busy *poolConn + busy *poolConn // the siza of list - count int + count int // idle conn - idle int + idle int } type poolConn struct { // grpc conn *grpc.ClientConn - err error - addr string + err error + addr string // pool and streams pool pool *pool @@ -44,9 +44,9 @@ type poolConn struct { created int64 // list - pre *poolConn - next *poolConn - in bool + pre *poolConn + next *poolConn + in bool } func newPool(size int, ttl time.Duration, idle int, ms int) *pool { @@ -57,11 +57,11 @@ func newPool(size int, ttl time.Duration, idle int, ms int) *pool { idle = 0 } return &pool{ - size: size, - ttl: int64(ttl.Seconds()), + size: size, + ttl: int64(ttl.Seconds()), maxStreams: ms, - maxIdle: idle, - conns: make(map[string]*streamsPool), + maxIdle: idle, + conns: make(map[string]*streamsPool), } } @@ -70,7 +70,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) p.Lock() sp, ok := p.conns[addr] if !ok { - sp = &streamsPool{head:&poolConn{}, busy:&poolConn{}, count:0, idle:0} + sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0} p.conns[addr] = sp } // while we have conns check streams and then return one @@ -90,10 +90,10 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) } // a busy conn if conn.streams >= p.maxStreams { - next := conn.next - removeConn(conn) - addConnAfter(conn, sp.busy) - conn = next + next := conn.next + removeConn(conn) + addConnAfter(conn, sp.busy) + conn = next continue } // a idle conn @@ -112,7 +112,7 @@ func (p *pool) getConn(addr string, opts ...grpc.DialOption) (*poolConn, error) if err != nil { return nil, err } - conn = &poolConn{cc,nil,addr,p,sp,1,time.Now().Unix(), nil, nil, false} + conn = &poolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false} // add conn to streams pool p.Lock() @@ -148,7 +148,7 @@ func (p *pool) release(addr string, conn *poolConn, err error) { // 2. too many idle conn or // 3. conn is too old now := time.Now().Unix() - if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { + if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { removeConn(conn) p.Unlock() conn.ClientConn.Close() @@ -160,11 +160,11 @@ func (p *pool) release(addr string, conn *poolConn, err error) { return } -func (conn *poolConn)Close() { +func (conn *poolConn) Close() { conn.pool.release(conn.addr, conn, conn.err) } -func removeConn(conn *poolConn) { +func removeConn(conn *poolConn) { if conn.pre != nil { conn.pre.next = conn.next } @@ -178,7 +178,7 @@ func removeConn(conn *poolConn) { return } -func addConnAfter(conn *poolConn, after *poolConn) { +func addConnAfter(conn *poolConn, after *poolConn) { conn.next = after.next conn.pre = after if after.next != nil { diff --git a/options.go b/options.go index c3862c7..146e42b 100644 --- a/options.go +++ b/options.go @@ -28,8 +28,8 @@ var ( DefaultMaxSendMsgSize = 1024 * 1024 * 4 ) -type poolMaxStreams struct {} -type poolMaxIdle struct {} +type poolMaxStreams struct{} +type poolMaxIdle struct{} type codecsKey struct{} type tlsAuth struct{} type maxRecvMsgSizeKey struct{}