From 372bb37779ae28a9b9e04fdb20052f2e6bbcf18b Mon Sep 17 00:00:00 2001 From: pugnack Date: Sun, 25 May 2025 03:14:55 +0500 Subject: [PATCH] [v4] hide access to internal mutex (#193) * changed embedded mutex to private field * changed embedded mutex to private field --- grpc.go | 12 ++++++------ grpc_pool.go | 20 ++++++++++---------- stream.go | 20 ++++++++++---------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/grpc.go b/grpc.go index f999999..17ae0a2 100644 --- a/grpc.go +++ b/grpc.go @@ -38,8 +38,8 @@ type grpcClient struct { funcStream client.FuncStream pool *ConnPool opts client.Options - sync.RWMutex - init bool + mu sync.RWMutex + init bool } // secure returns the dial option for whether its a secure or insecure connection @@ -361,8 +361,8 @@ func (g *grpcClient) maxSendMsgSizeValue() int { } func (g *grpcClient) newCodec(ct string) (codec.Codec, error) { - g.RLock() - defer g.RUnlock() + g.mu.RLock() + defer g.mu.RUnlock() if idx := strings.IndexRune(ct, ';'); idx >= 0 { ct = ct[:idx] @@ -398,10 +398,10 @@ func (g *grpcClient) Init(opts ...client.Option) error { // update pool configuration if the options changed if size != g.opts.PoolSize || ttl != g.opts.PoolTTL { - g.pool.Lock() + g.pool.mu.Lock() g.pool.size = g.opts.PoolSize g.pool.ttl = int64(g.opts.PoolTTL.Seconds()) - g.pool.Unlock() + g.pool.mu.Unlock() } g.funcCall = g.fnCall diff --git a/grpc_pool.go b/grpc_pool.go index 026e222..4a7015d 100644 --- a/grpc_pool.go +++ b/grpc_pool.go @@ -16,7 +16,7 @@ type ConnPool struct { ttl int64 maxStreams int maxIdle int - sync.Mutex + mu sync.Mutex } type streamsPool struct { @@ -64,7 +64,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption addr = addr[strings.Index(addr, ":")+3:] } now := time.Now().Unix() - p.Lock() + p.mu.Lock() sp, ok := p.conns[addr] if !ok { sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0} @@ -125,10 +125,10 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption } // a good conn conn.streams++ - p.Unlock() + p.mu.Unlock() return conn, nil } - p.Unlock() + p.mu.Unlock() // nolint (TODO need fix) create new conn) cc, err := grpc.DialContext(ctx, addr, opts...) @@ -138,24 +138,24 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} // add conn to streams pool - p.Lock() + p.mu.Lock() if sp.count < p.size { addConnAfter(conn, sp.head) } - p.Unlock() + p.mu.Unlock() return conn, nil } func (p *ConnPool) Put(conn *PoolConn, err error) { - p.Lock() + p.mu.Lock() p, sp, created := conn.pool, conn.sp, conn.created // try to add conn if !conn.in && sp.count < p.size { addConnAfter(conn, sp.head) } if !conn.in { - p.Unlock() + p.mu.Unlock() conn.ClientConn.Close() return } @@ -173,13 +173,13 @@ func (p *ConnPool) Put(conn *PoolConn, err error) { now := time.Now().Unix() if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { removeConn(conn) - p.Unlock() + p.mu.Unlock() conn.ClientConn.Close() return } sp.idle++ } - p.Unlock() + p.mu.Unlock() } func (conn *PoolConn) Close() { diff --git a/stream.go b/stream.go index b591017..52dd79b 100644 --- a/stream.go +++ b/stream.go @@ -19,8 +19,8 @@ type grpcStream struct { response client.Response close func(err error) conn *PoolConn - sync.RWMutex - closed bool + mu sync.RWMutex + closed bool } func (g *grpcStream) Context() context.Context { @@ -88,15 +88,15 @@ func (g *grpcStream) RecvMsg(msg interface{}) (err error) { } func (g *grpcStream) Error() error { - g.RLock() - defer g.RUnlock() + g.mu.RLock() + defer g.mu.RUnlock() return g.err } func (g *grpcStream) setError(e error) { - g.Lock() + g.mu.Lock() g.err = e - g.Unlock() + g.mu.Unlock() } // Close the gRPC send stream @@ -105,8 +105,8 @@ func (g *grpcStream) setError(e error) { // stream should still be able to receive after this function call // TODO: should the conn be closed in another way? func (g *grpcStream) Close() error { - g.Lock() - defer g.Unlock() + g.mu.Lock() + defer g.mu.Unlock() if g.closed { return nil @@ -125,8 +125,8 @@ func (g *grpcStream) Close() error { } func (g *grpcStream) CloseSend() error { - g.Lock() - defer g.Unlock() + g.mu.Lock() + defer g.mu.Unlock() if g.closed { return nil