memory transport: fix race cond on channel close
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -12,6 +12,10 @@ import ( | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	r = rand.New(rand.NewSource(time.Now().UnixNano())) | ||||
| ) | ||||
|  | ||||
| type memorySocket struct { | ||||
| 	recv chan *transport.Message | ||||
| 	send chan *transport.Message | ||||
| @@ -22,6 +26,7 @@ type memorySocket struct { | ||||
|  | ||||
| 	local  string | ||||
| 	remote string | ||||
| 	sync.RWMutex | ||||
| } | ||||
|  | ||||
| type memoryClient struct { | ||||
| @@ -34,16 +39,18 @@ type memoryListener struct { | ||||
| 	exit chan bool | ||||
| 	conn chan *memorySocket | ||||
| 	opts transport.ListenOptions | ||||
| 	sync.RWMutex | ||||
| } | ||||
|  | ||||
| type memoryTransport struct { | ||||
| 	opts transport.Options | ||||
|  | ||||
| 	sync.Mutex | ||||
| 	sync.RWMutex | ||||
| 	listeners map[string]*memoryListener | ||||
| } | ||||
|  | ||||
| func (ms *memorySocket) Recv(m *transport.Message) error { | ||||
| 	ms.RLock() | ||||
| 	defer ms.RUnlock() | ||||
| 	select { | ||||
| 	case <-ms.exit: | ||||
| 		return errors.New("connection closed") | ||||
| @@ -64,6 +71,8 @@ func (ms *memorySocket) Remote() string { | ||||
| } | ||||
|  | ||||
| func (ms *memorySocket) Send(m *transport.Message) error { | ||||
| 	ms.RLock() | ||||
| 	defer ms.RUnlock() | ||||
| 	select { | ||||
| 	case <-ms.exit: | ||||
| 		return errors.New("connection closed") | ||||
| @@ -75,6 +84,8 @@ func (ms *memorySocket) Send(m *transport.Message) error { | ||||
| } | ||||
|  | ||||
| func (ms *memorySocket) Close() error { | ||||
| 	ms.RLock() | ||||
| 	defer ms.RUnlock() | ||||
| 	select { | ||||
| 	case <-ms.exit: | ||||
| 		return nil | ||||
| @@ -89,6 +100,8 @@ func (m *memoryListener) Addr() string { | ||||
| } | ||||
|  | ||||
| func (m *memoryListener) Close() error { | ||||
| 	m.Lock() | ||||
| 	defer m.Unlock() | ||||
| 	select { | ||||
| 	case <-m.exit: | ||||
| 		return nil | ||||
| @@ -117,8 +130,8 @@ func (m *memoryListener) Accept(fn func(transport.Socket)) error { | ||||
| } | ||||
|  | ||||
| func (m *memoryTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { | ||||
| 	m.Lock() | ||||
| 	defer m.Unlock() | ||||
| 	m.RLock() | ||||
| 	defer m.RUnlock() | ||||
|  | ||||
| 	listener, ok := m.listeners[addr] | ||||
| 	if !ok { | ||||
| @@ -165,7 +178,6 @@ func (m *memoryTransport) Listen(addr string, opts ...transport.ListenOption) (t | ||||
|  | ||||
| 	// if zero port then randomly assign one | ||||
| 	if len(parts) > 1 && parts[len(parts)-1] == "0" { | ||||
| 		r := rand.New(rand.NewSource(time.Now().UnixNano())) | ||||
| 		i := r.Intn(10000) | ||||
| 		// set addr with port | ||||
| 		addr = fmt.Sprintf("%s:%d", parts[:len(parts)-1], 10000+i) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user