From f26d470db1d7b3be4bb6e3d8b8223eb96999afdf Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 24 Oct 2019 17:51:41 +0100 Subject: [PATCH] A few changes for the network / tunnel link state --- network/default.go | 28 ++++++++---- proxy/mucp/mucp.go | 3 ++ tunnel/default.go | 11 ++--- tunnel/link.go | 109 +++++++++++++++++++++++++++++++++++++-------- 4 files changed, 119 insertions(+), 32 deletions(-) diff --git a/network/default.go b/network/default.go index 741c4fb8..57fcbc26 100644 --- a/network/default.go +++ b/network/default.go @@ -677,19 +677,19 @@ func (n *network) getHopCount(rtr string) int { // the route origin is our peer if _, ok := n.peers[rtr]; ok { - return 2 + return 10 } // the route origin is the peer of our peer for _, peer := range n.peers { for id := range peer.peers { if rtr == id { - return 3 + return 100 } } } // otherwise we are three hops away - return 4 + return 1000 } // getRouteMetric calculates router metric and returns it @@ -721,11 +721,15 @@ func (n *network) getRouteMetric(router string, gateway string, link string) int // make sure length is non-zero length := link.Length() if length == 0 { - length = 10e10 + log.Debugf("Link length is 0 %v %v", link, link.Length()) + length = 10e9 } - return (delay * length * int64(hops)) / 10e9 + log.Debugf("Network calculated metric %v delay %v length %v distance %v", (delay*length*int64(hops))/10e6, delay, length, hops) + return (delay * length * int64(hops)) / 10e6 } + log.Debugf("Network failed to find a link to gateway: %s", gateway) + return math.MaxInt64 } @@ -783,12 +787,18 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { } // calculate route metric and add to the advertised metric // we need to make sure we do not overflow math.MaxInt64 - log.Debugf("Network metric for router %s and gateway %s", event.Route.Router, event.Route.Gateway) - if metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link); metric != math.MaxInt64 { - route.Metric += metric + metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link) + log.Debugf("Network metric for router %s and gateway %s: %v", event.Route.Router, event.Route.Gateway, metric) + + // check we don't overflow max int 64 + if d := route.Metric + metric; d > math.MaxInt64 || d <= 0 { + // set to max int64 if we overflow + route.Metric = math.MaxInt64 } else { - route.Metric = metric + // set the combined value of metrics otherwise + route.Metric = d } + // create router event e := &router.Event{ Type: router.EventType(event.Type), diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index b336e5f3..06ab676f 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -19,6 +19,7 @@ import ( "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/router" "github.com/micro/go-micro/server" + "github.com/micro/go-micro/util/log" ) // Proxy will transparently proxy requests to an endpoint. @@ -294,6 +295,8 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server continue } + log.Debugf("Proxy using route %+v\n", route) + // set the address to call addresses := toNodes([]router.Route{route}) opts = append(opts, client.WithAddress(addresses...)) diff --git a/tunnel/default.go b/tunnel/default.go index 0d16ae90..a7555eb1 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -746,8 +746,13 @@ func (t *tun) setupLink(node string) (*link, error) { } log.Debugf("Tunnel connected to %s", node) + // create a new link + link := newLink(c) + // set link id to remote side + link.id = c.Remote() + // send the first connect message - if err := c.Send(&transport.Message{ + if err := link.Send(&transport.Message{ Header: map[string]string{ "Micro-Tunnel": "connect", "Micro-Tunnel-Id": t.id, @@ -757,10 +762,6 @@ func (t *tun) setupLink(node string) (*link, error) { return nil, err } - // create a new link - link := newLink(c) - // set link id to remote side - link.id = c.Remote() // we made the outbound connection // and sent the connect message link.connected = true diff --git a/tunnel/link.go b/tunnel/link.go index 26b568fc..5c53c91a 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -1,12 +1,14 @@ package tunnel import ( + "bytes" "io" "sync" "time" "github.com/google/uuid" "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/util/log" ) type link struct { @@ -42,6 +44,9 @@ type link struct { rate float64 // keep an error count on the link errCount int + + // link state channel + state chan *packet } // packet send over link @@ -56,21 +61,49 @@ type packet struct { err error } +var ( + // the 4 byte 0 packet sent to determine the link state + linkRequest = []byte{0, 0, 0, 0} + // the 4 byte 1 filled packet sent to determine link state + linkResponse = []byte{1, 1, 1, 1} +) + func newLink(s transport.Socket) *link { l := &link{ Socket: s, id: uuid.New().String(), lastKeepAlive: time.Now(), closed: make(chan bool), + state: make(chan *packet, 64), channels: make(map[string]time.Time), sendQueue: make(chan *packet, 128), recvQueue: make(chan *packet, 128), } + + // process inbound/outbound packets go l.process() - go l.expiry() + // manage the link state + go l.manage() + return l } +// setRate sets the bits per second rate as a float64 +func (l *link) setRate(bits int64, delta time.Duration) { + // rate of send in bits per nanosecond + rate := float64(bits) / float64(delta.Nanoseconds()) + + // default the rate if its zero + if l.rate == 0 { + // rate per second + l.rate = rate * 1e9 + } else { + // set new rate per second + l.rate = 0.8*l.rate + 0.2*(rate*1e9) + } +} + +// setRTT sets a nanosecond based moving average roundtrip time for the link func (l *link) setRTT(d time.Duration) { l.Lock() defer l.Unlock() @@ -101,8 +134,22 @@ func (l *link) process() { // process new received message + pk := &packet{message: m, err: err} + + // this is our link state packet + if m.Header["Micro-Method"] == "link" { + // process link state message + select { + case l.state <- pk: + default: + } + continue + } + + // process all messages as is + select { - case l.recvQueue <- &packet{message: m, err: err}: + case l.recvQueue <- pk: case <-l.closed: return } @@ -122,15 +169,49 @@ func (l *link) process() { } } -// watches the channel expiry -func (l *link) expiry() { +// manage manages the link state including rtt packets and channel mapping expiry +func (l *link) manage() { + // tick over every minute to expire and fire rtt packets t := time.NewTicker(time.Minute) defer t.Stop() + // used to send link state packets + send := func(b []byte) { + l.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Method": "link", + }, Body: b, + }) + } + + // set time now + now := time.Now() + + // send the initial rtt request packet + send(linkRequest) + for { select { + // exit if closed case <-l.closed: return + // process link state rtt packets + case p := <-l.state: + if p.err != nil { + continue + } + // check the type of message + switch { + case bytes.Compare(p.message.Body, linkRequest) == 0: + log.Tracef("Link %s received link request %v", l.id, p.message.Body) + // send response + send(linkResponse) + case bytes.Compare(p.message.Body, linkResponse) == 0: + // set round trip time + d := time.Since(now) + log.Tracef("Link %s received link response in %v", p.message.Body, d) + l.setRTT(d) + } case <-t.C: // drop any channel mappings older than 2 minutes var kill []string @@ -155,6 +236,10 @@ func (l *link) expiry() { delete(l.channels, ch) } l.Unlock() + + // fire off a link state rtt packet + now = time.Now() + send(linkRequest) } } } @@ -278,23 +363,11 @@ func (l *link) Send(m *transport.Message) error { // calculate based on data if dataSent > 0 { - // measure time taken - delta := time.Since(now) - // bit sent bits := dataSent * 1024 - // rate of send in bits per nanosecond - rate := float64(bits) / float64(delta.Nanoseconds()) - - // default the rate if its zero - if l.rate == 0 { - // rate per second - l.rate = rate * 1e9 - } else { - // set new rate per second - l.rate = 0.8*l.rate + 0.2*(rate*1e9) - } + // set the rate + l.setRate(int64(bits), time.Since(now)) } return nil