From 29fa8de98ed62d90b85f29a1883a684d0ea26dfc Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 11 Jul 2019 01:21:03 +0300 Subject: [PATCH] memory transport: fix race cond on channel close Signed-off-by: Vasiliy Tolstov --- transport/memory/memory.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/transport/memory/memory.go b/transport/memory/memory.go index ed00ce69..274e98c9 100644 --- a/transport/memory/memory.go +++ b/transport/memory/memory.go @@ -12,6 +12,10 @@ import ( "github.com/micro/go-micro/transport" ) +var ( + r = rand.New(rand.NewSource(time.Now().UnixNano())) +) + type memorySocket struct { recv chan *transport.Message send chan *transport.Message @@ -22,6 +26,7 @@ type memorySocket struct { local string remote string + sync.RWMutex } type memoryClient struct { @@ -34,16 +39,18 @@ type memoryListener struct { exit chan bool conn chan *memorySocket opts transport.ListenOptions + sync.RWMutex } type memoryTransport struct { opts transport.Options - - sync.Mutex + sync.RWMutex listeners map[string]*memoryListener } func (ms *memorySocket) Recv(m *transport.Message) error { + ms.RLock() + defer ms.RUnlock() select { case <-ms.exit: return errors.New("connection closed") @@ -64,6 +71,8 @@ func (ms *memorySocket) Remote() string { } func (ms *memorySocket) Send(m *transport.Message) error { + ms.RLock() + defer ms.RUnlock() select { case <-ms.exit: return errors.New("connection closed") @@ -75,6 +84,8 @@ func (ms *memorySocket) Send(m *transport.Message) error { } func (ms *memorySocket) Close() error { + ms.RLock() + defer ms.RUnlock() select { case <-ms.exit: return nil @@ -89,6 +100,8 @@ func (m *memoryListener) Addr() string { } func (m *memoryListener) Close() error { + m.Lock() + defer m.Unlock() select { case <-m.exit: return nil @@ -117,8 +130,8 @@ func (m *memoryListener) Accept(fn func(transport.Socket)) error { } func (m *memoryTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() listener, ok := m.listeners[addr] if !ok { @@ -165,7 +178,6 @@ func (m *memoryTransport) Listen(addr string, opts ...transport.ListenOption) (t // if zero port then randomly assign one if len(parts) > 1 && parts[len(parts)-1] == "0" { - r := rand.New(rand.NewSource(time.Now().UnixNano())) i := r.Intn(10000) // set addr with port addr = fmt.Sprintf("%s:%d", parts[:len(parts)-1], 10000+i)