From 407694232ab53489cd6a4c9b3149dc10fa2ca66b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 22 Oct 2019 18:43:09 +0100 Subject: [PATCH 1/2] Measure roundtrip times on link --- tunnel/default.go | 38 +++++++++++++++++++++++++++++++++++++- tunnel/link.go | 26 ++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 73052581..5ba6b868 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -966,7 +966,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { c.mode = options.Mode // set the dial timeout c.timeout = options.Timeout - + // get the current time now := time.Now() after := func() time.Duration { @@ -980,6 +980,8 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } var links []string + // did we measure the rtt + var measured bool // non multicast so we need to find the link if id := options.Link; id != "" { @@ -1021,6 +1023,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // shit fuck if !c.discovered { + // piggy back roundtrip + nowRTT := time.Now() + // create a new discovery message for this channel msg := c.newMessage("discover") msg.mode = Broadcast @@ -1075,12 +1080,31 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { return nil, err } + // set roundtrip + d := time.Since(nowRTT) + + // set the link time + t.RLock() + link, ok := t.links[msg.link] + t.RUnlock() + + if ok { + // set the rountrip time + link.setRTT(d) + } + + // set measured to true + measured = true + // set discovered to true c.discovered = true } // a unicast session so we call "open" and wait for an "accept" + // reset now in case we use it + now = time.Now() + // try to open the session err := c.Open() if err != nil { @@ -1089,6 +1113,18 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { return nil, err } + // if we haven't measured the roundtrip do it now + if !measured && c.mode == Unicast { + // set the link time + t.RLock() + link, ok := t.links[c.link] + t.RUnlock() + if ok { + // set the rountrip time + link.setRTT(time.Since(now)) + } + } + return c, nil } diff --git a/tunnel/link.go b/tunnel/link.go index 22362198..91636075 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -32,6 +32,9 @@ type link struct { // channels keeps a mapping of channels and last seen channels map[string]time.Time + // the weighed moving average roundtrip + rtt int64 + // keep an error count on the link errCount int } @@ -48,6 +51,21 @@ func newLink(s transport.Socket) *link { return l } +func (l *link) setRTT(d time.Duration) { + l.Lock() + defer l.Unlock() + + if l.rtt < 0 { + l.rtt = d.Nanoseconds() + return + } + + // https://fishi.devtail.io/weblog/2015/04/12/measuring-bandwidth-and-round-trip-time-tcp-connection-inside-application-layer/ + rtt := 0.8*float64(l.rtt) + 0.2*float64(d.Nanoseconds()) + // set new rtt + l.rtt = int64(rtt) +} + // watches the channel expiry func (l *link) expiry() { t := time.NewTicker(time.Minute) @@ -95,9 +113,13 @@ func (l *link) Rate() float64 { return float64(10e8) } -// Length returns the roundtrip time as nanoseconds (lower is better) +// Length returns the roundtrip time as nanoseconds (lower is better). +// Returns 0 where no measurement has been taken. func (l *link) Length() int64 { - return time.Second.Nanoseconds() + l.RLock() + defer l.RUnlock() + + return l.rtt } func (l *link) Id() string { From d64f8c665ea617a003d56c9cc50e66cb1cf03c25 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 22 Oct 2019 19:38:29 +0100 Subject: [PATCH 2/2] add rate measure --- tunnel/default.go | 7 +++--- tunnel/link.go | 43 ++++++++++++++++++++++++++++++++++--- tunnel/tunnel_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 7 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 5ba6b868..6cc48d3c 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -1085,17 +1085,16 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // set the link time t.RLock() - link, ok := t.links[msg.link] + link, ok := t.links[c.link] t.RUnlock() if ok { // set the rountrip time link.setRTT(d) + // set measured to true + measured = true } - // set measured to true - measured = true - // set discovered to true c.discovered = true } diff --git a/tunnel/link.go b/tunnel/link.go index 91636075..d5c044b9 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -32,9 +32,12 @@ type link struct { // channels keeps a mapping of channels and last seen channels map[string]time.Time - // the weighed moving average roundtrip + // the weighted moving average roundtrip rtt int64 + // weighted moving average of bits flowing + rate float64 + // keep an error count on the link errCount int } @@ -55,7 +58,7 @@ func (l *link) setRTT(d time.Duration) { l.Lock() defer l.Unlock() - if l.rtt < 0 { + if l.rtt <= 0 { l.rtt = d.Nanoseconds() return } @@ -110,7 +113,9 @@ func (l *link) Delay() int64 { // Current transfer rate as bits per second (lower is better) func (l *link) Rate() float64 { - return float64(10e8) + l.RLock() + defer l.RUnlock() + return l.rate } // Length returns the roundtrip time as nanoseconds (lower is better). @@ -141,11 +146,43 @@ func (l *link) Close() error { } func (l *link) Send(m *transport.Message) error { + dataSent := len(m.Body) + + // set header length + for k, v := range m.Header { + dataSent += (len(k) + len(v)) + } + + // get time now + now := time.Now() + + // send the message err := l.Socket.Send(m) l.Lock() defer l.Unlock() + // calculate based on data + if dataSent > 0 { + // measure time taken + delta := time.Since(now) + + // bit sent + bits := dataSent * 1024 + + // rate of send in bits per nanosecond + rate := float64(bits) / float64(delta.Nanoseconds()) + + // + if l.rate == 0 { + // rate per second + l.rate = rate * 1e9 + } else { + // set new rate per second + l.rate = 0.8*l.rate + 0.2*(rate*1e9) + } + } + // if theres no error reset the counter if err == nil { l.errCount = 0 diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index fc76421b..a06a1b01 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -292,3 +292,53 @@ func TestReconnectTunnel(t *testing.T) { // wait until done wg.Wait() } + +func TestTunnelRTTRate(t *testing.T) { + // create a new tunnel client + tunA := NewTunnel( + Address("127.0.0.1:9096"), + Nodes("127.0.0.1:9097"), + ) + + // create a new tunnel server + tunB := NewTunnel( + Address("127.0.0.1:9097"), + ) + + // start tunB + err := tunB.Connect() + if err != nil { + t.Fatal(err) + } + defer tunB.Close() + + // start tunA + err = tunA.Connect() + if err != nil { + t.Fatal(err) + } + defer tunA.Close() + + wait := make(chan bool) + + var wg sync.WaitGroup + + wg.Add(1) + // start the listener + go testAccept(t, tunB, wait, &wg) + + wg.Add(1) + // start the client + go testSend(t, tunA, wait, &wg) + + // wait until done + wg.Wait() + + for _, link := range tunA.Links() { + t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate()) + } + + for _, link := range tunB.Links() { + t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate()) + } +}