Fix some link connection logic

This commit is contained in:
Asim Aslam 2019-07-03 19:51:40 +01:00
parent e54de56376
commit 2644497ccb
2 changed files with 13 additions and 14 deletions

View File

@ -11,7 +11,6 @@ import (
"github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/codec/proto"
pb "github.com/micro/go-micro/network/proto" pb "github.com/micro/go-micro/network/proto"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"github.com/micro/go-micro/util/log"
) )
type link struct { type link struct {
@ -71,12 +70,17 @@ func (l *link) process() {
for { for {
m := new(Message) m := new(Message)
if err := l.recv(m, nil); err != nil { if err := l.recv(m, nil); err != nil {
return
}
// check if it's an internal close method
if m.Header["Micro-Method"] == "close" {
l.Close() l.Close()
return return
} }
select { select {
case l.recvQueue <- m: case l.recvQueue <- m:
log.Debugf("%s processing recv", l.id)
case <-l.closed: case <-l.closed:
return return
} }
@ -87,7 +91,6 @@ func (l *link) process() {
select { select {
case m := <-l.sendQueue: case m := <-l.sendQueue:
if err := l.send(m, nil); err != nil { if err := l.send(m, nil); err != nil {
l.Close()
return return
} }
case <-l.closed: case <-l.closed:
@ -265,15 +268,11 @@ func (l *link) recv(m *Message, v interface{}) error {
tm := new(transport.Message) tm := new(transport.Message)
log.Debugf("link %s attempting receiving", l.id)
// receive the transport message // receive the transport message
if err := l.socket.Recv(tm); err != nil { if err := l.socket.Recv(tm); err != nil {
return err return err
} }
log.Debugf("link %s received %+v %+v\n", l.id, tm, v)
// set the message // set the message
m.Header = tm.Header m.Header = tm.Header
m.Body = tm.Body m.Body = tm.Body
@ -303,14 +302,11 @@ func (l *link) Close() error {
} }
// send a final close message // send a final close message
l.socket.Send(&transport.Message{ return l.socket.Send(&transport.Message{
Header: map[string]string{ Header: map[string]string{
"Micro-Method": "close", "Micro-Method": "close",
}, },
}) })
// close the socket
return l.socket.Close()
} }
// returns the node id // returns the node id
@ -348,7 +344,7 @@ func (l *link) Weight() int {
func (l *link) Accept() (*Message, error) { func (l *link) Accept() (*Message, error) {
select { select {
case <-l.closed: case <-l.closed:
return nil, ErrLinkClosed return nil, io.EOF
case m := <-l.recvQueue: case m := <-l.recvQueue:
return m, nil return m, nil
} }
@ -360,7 +356,7 @@ func (l *link) Accept() (*Message, error) {
func (l *link) Send(m *Message) error { func (l *link) Send(m *Message) error {
select { select {
case <-l.closed: case <-l.closed:
return ErrLinkClosed return io.EOF
case l.sendQueue <- m: case l.sendQueue <- m:
} }
return nil return nil

View File

@ -3,6 +3,7 @@ package network
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"runtime/debug" "runtime/debug"
"sort" "sort"
@ -282,7 +283,6 @@ func (n *node) process() {
}) })
// queue the message // queue the message
log.Debugf("sending on link %s", links[0].id)
links[0].Send(m) links[0].Send(m)
} }
n.RUnlock() n.RUnlock()
@ -304,6 +304,9 @@ func (n *node) manage(l *link) {
// so we can judge link saturation both ways. // so we can judge link saturation both ways.
m, err := l.Accept() m, err := l.Accept()
if err == io.EOF {
return
}
if err != nil { if err != nil {
log.Debugf("Error accepting message on link %s: %v", l.id, err) log.Debugf("Error accepting message on link %s: %v", l.id, err)
// ??? // ???