[v4] hide access to internal mutex (#193)
* changed embedded mutex to private field * changed embedded mutex to private field
This commit is contained in:
12
grpc.go
12
grpc.go
@@ -38,8 +38,8 @@ type grpcClient struct {
|
|||||||
funcStream client.FuncStream
|
funcStream client.FuncStream
|
||||||
pool *ConnPool
|
pool *ConnPool
|
||||||
opts client.Options
|
opts client.Options
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
init bool
|
init bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// secure returns the dial option for whether its a secure or insecure connection
|
// secure returns the dial option for whether its a secure or insecure connection
|
||||||
@@ -361,8 +361,8 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
|
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
defer g.RUnlock()
|
defer g.mu.RUnlock()
|
||||||
|
|
||||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||||
ct = ct[:idx]
|
ct = ct[:idx]
|
||||||
@@ -398,10 +398,10 @@ func (g *grpcClient) Init(opts ...client.Option) error {
|
|||||||
|
|
||||||
// update pool configuration if the options changed
|
// update pool configuration if the options changed
|
||||||
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
|
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
|
||||||
g.pool.Lock()
|
g.pool.mu.Lock()
|
||||||
g.pool.size = g.opts.PoolSize
|
g.pool.size = g.opts.PoolSize
|
||||||
g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
|
g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
|
||||||
g.pool.Unlock()
|
g.pool.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
g.funcCall = g.fnCall
|
g.funcCall = g.fnCall
|
||||||
|
20
grpc_pool.go
20
grpc_pool.go
@@ -16,7 +16,7 @@ type ConnPool struct {
|
|||||||
ttl int64
|
ttl int64
|
||||||
maxStreams int
|
maxStreams int
|
||||||
maxIdle int
|
maxIdle int
|
||||||
sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamsPool struct {
|
type streamsPool struct {
|
||||||
@@ -64,7 +64,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
addr = addr[strings.Index(addr, ":")+3:]
|
addr = addr[strings.Index(addr, ":")+3:]
|
||||||
}
|
}
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
p.Lock()
|
p.mu.Lock()
|
||||||
sp, ok := p.conns[addr]
|
sp, ok := p.conns[addr]
|
||||||
if !ok {
|
if !ok {
|
||||||
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
||||||
@@ -125,10 +125,10 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
}
|
}
|
||||||
// a good conn
|
// a good conn
|
||||||
conn.streams++
|
conn.streams++
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
|
|
||||||
// nolint (TODO need fix) create new conn)
|
// nolint (TODO need fix) create new conn)
|
||||||
cc, err := grpc.DialContext(ctx, addr, opts...)
|
cc, err := grpc.DialContext(ctx, addr, opts...)
|
||||||
@@ -138,24 +138,24 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
||||||
|
|
||||||
// add conn to streams pool
|
// add conn to streams pool
|
||||||
p.Lock()
|
p.mu.Lock()
|
||||||
if sp.count < p.size {
|
if sp.count < p.size {
|
||||||
addConnAfter(conn, sp.head)
|
addConnAfter(conn, sp.head)
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
||||||
p.Lock()
|
p.mu.Lock()
|
||||||
p, sp, created := conn.pool, conn.sp, conn.created
|
p, sp, created := conn.pool, conn.sp, conn.created
|
||||||
// try to add conn
|
// try to add conn
|
||||||
if !conn.in && sp.count < p.size {
|
if !conn.in && sp.count < p.size {
|
||||||
addConnAfter(conn, sp.head)
|
addConnAfter(conn, sp.head)
|
||||||
}
|
}
|
||||||
if !conn.in {
|
if !conn.in {
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -173,13 +173,13 @@ func (p *ConnPool) Put(conn *PoolConn, err error) {
|
|||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
||||||
removeConn(conn)
|
removeConn(conn)
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sp.idle++
|
sp.idle++
|
||||||
}
|
}
|
||||||
p.Unlock()
|
p.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *PoolConn) Close() {
|
func (conn *PoolConn) Close() {
|
||||||
|
20
stream.go
20
stream.go
@@ -19,8 +19,8 @@ type grpcStream struct {
|
|||||||
response client.Response
|
response client.Response
|
||||||
close func(err error)
|
close func(err error)
|
||||||
conn *PoolConn
|
conn *PoolConn
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Context() context.Context {
|
func (g *grpcStream) Context() context.Context {
|
||||||
@@ -88,15 +88,15 @@ func (g *grpcStream) RecvMsg(msg interface{}) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Error() error {
|
func (g *grpcStream) Error() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
defer g.RUnlock()
|
defer g.mu.RUnlock()
|
||||||
return g.err
|
return g.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) setError(e error) {
|
func (g *grpcStream) setError(e error) {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.err = e
|
g.err = e
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the gRPC send stream
|
// Close the gRPC send stream
|
||||||
@@ -105,8 +105,8 @@ func (g *grpcStream) setError(e error) {
|
|||||||
// stream should still be able to receive after this function call
|
// stream should still be able to receive after this function call
|
||||||
// TODO: should the conn be closed in another way?
|
// TODO: should the conn be closed in another way?
|
||||||
func (g *grpcStream) Close() error {
|
func (g *grpcStream) Close() error {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
if g.closed {
|
if g.closed {
|
||||||
return nil
|
return nil
|
||||||
@@ -125,8 +125,8 @@ func (g *grpcStream) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) CloseSend() error {
|
func (g *grpcStream) CloseSend() error {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
if g.closed {
|
if g.closed {
|
||||||
return nil
|
return nil
|
||||||
|
Reference in New Issue
Block a user