diff --git a/network/link.go b/network/link.go index 9dc5b7d7..5b676be5 100644 --- a/network/link.go +++ b/network/link.go @@ -11,7 +11,6 @@ import ( "github.com/micro/go-micro/codec/proto" pb "github.com/micro/go-micro/network/proto" "github.com/micro/go-micro/transport" - "github.com/micro/go-micro/util/log" ) type link struct { @@ -71,12 +70,17 @@ func (l *link) process() { for { m := new(Message) if err := l.recv(m, nil); err != nil { + return + } + + // check if it's an internal close method + if m.Header["Micro-Method"] == "close" { l.Close() return } + select { case l.recvQueue <- m: - log.Debugf("%s processing recv", l.id) case <-l.closed: return } @@ -87,7 +91,6 @@ func (l *link) process() { select { case m := <-l.sendQueue: if err := l.send(m, nil); err != nil { - l.Close() return } case <-l.closed: @@ -265,15 +268,11 @@ func (l *link) recv(m *Message, v interface{}) error { tm := new(transport.Message) - log.Debugf("link %s attempting receiving", l.id) - // receive the transport message if err := l.socket.Recv(tm); err != nil { return err } - log.Debugf("link %s received %+v %+v\n", l.id, tm, v) - // set the message m.Header = tm.Header m.Body = tm.Body @@ -303,14 +302,11 @@ func (l *link) Close() error { } // send a final close message - l.socket.Send(&transport.Message{ + return l.socket.Send(&transport.Message{ Header: map[string]string{ "Micro-Method": "close", }, }) - - // close the socket - return l.socket.Close() } // returns the node id @@ -348,7 +344,7 @@ func (l *link) Weight() int { func (l *link) Accept() (*Message, error) { select { case <-l.closed: - return nil, ErrLinkClosed + return nil, io.EOF case m := <-l.recvQueue: return m, nil } @@ -360,7 +356,7 @@ func (l *link) Accept() (*Message, error) { func (l *link) Send(m *Message) error { select { case <-l.closed: - return ErrLinkClosed + return io.EOF case l.sendQueue <- m: } return nil diff --git a/network/node.go b/network/node.go index 1e6b7cd1..ea05824a 100644 --- a/network/node.go +++ b/network/node.go @@ -3,6 +3,7 @@ package network import ( "errors" "fmt" + "io" "net" "runtime/debug" "sort" @@ -282,7 +283,6 @@ func (n *node) process() { }) // queue the message - log.Debugf("sending on link %s", links[0].id) links[0].Send(m) } n.RUnlock() @@ -304,6 +304,9 @@ func (n *node) manage(l *link) { // so we can judge link saturation both ways. m, err := l.Accept() + if err == io.EOF { + return + } if err != nil { log.Debugf("Error accepting message on link %s: %v", l.id, err) // ???