From d64f8c665ea617a003d56c9cc50e66cb1cf03c25 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 22 Oct 2019 19:38:29 +0100 Subject: [PATCH] add rate measure --- tunnel/default.go | 7 +++--- tunnel/link.go | 43 ++++++++++++++++++++++++++++++++++--- tunnel/tunnel_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 7 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 5ba6b868..6cc48d3c 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -1085,17 +1085,16 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // set the link time t.RLock() - link, ok := t.links[msg.link] + link, ok := t.links[c.link] t.RUnlock() if ok { // set the rountrip time link.setRTT(d) + // set measured to true + measured = true } - // set measured to true - measured = true - // set discovered to true c.discovered = true } diff --git a/tunnel/link.go b/tunnel/link.go index 91636075..d5c044b9 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -32,9 +32,12 @@ type link struct { // channels keeps a mapping of channels and last seen channels map[string]time.Time - // the weighed moving average roundtrip + // the weighted moving average roundtrip rtt int64 + // weighted moving average of bits flowing + rate float64 + // keep an error count on the link errCount int } @@ -55,7 +58,7 @@ func (l *link) setRTT(d time.Duration) { l.Lock() defer l.Unlock() - if l.rtt < 0 { + if l.rtt <= 0 { l.rtt = d.Nanoseconds() return } @@ -110,7 +113,9 @@ func (l *link) Delay() int64 { // Current transfer rate as bits per second (lower is better) func (l *link) Rate() float64 { - return float64(10e8) + l.RLock() + defer l.RUnlock() + return l.rate } // Length returns the roundtrip time as nanoseconds (lower is better). @@ -141,11 +146,43 @@ func (l *link) Close() error { } func (l *link) Send(m *transport.Message) error { + dataSent := len(m.Body) + + // set header length + for k, v := range m.Header { + dataSent += (len(k) + len(v)) + } + + // get time now + now := time.Now() + + // send the message err := l.Socket.Send(m) l.Lock() defer l.Unlock() + // calculate based on data + if dataSent > 0 { + // measure time taken + delta := time.Since(now) + + // bit sent + bits := dataSent * 1024 + + // rate of send in bits per nanosecond + rate := float64(bits) / float64(delta.Nanoseconds()) + + // + if l.rate == 0 { + // rate per second + l.rate = rate * 1e9 + } else { + // set new rate per second + l.rate = 0.8*l.rate + 0.2*(rate*1e9) + } + } + // if theres no error reset the counter if err == nil { l.errCount = 0 diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index fc76421b..a06a1b01 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -292,3 +292,53 @@ func TestReconnectTunnel(t *testing.T) { // wait until done wg.Wait() } + +func TestTunnelRTTRate(t *testing.T) { + // create a new tunnel client + tunA := NewTunnel( + Address("127.0.0.1:9096"), + Nodes("127.0.0.1:9097"), + ) + + // create a new tunnel server + tunB := NewTunnel( + Address("127.0.0.1:9097"), + ) + + // start tunB + err := tunB.Connect() + if err != nil { + t.Fatal(err) + } + defer tunB.Close() + + // start tunA + err = tunA.Connect() + if err != nil { + t.Fatal(err) + } + defer tunA.Close() + + wait := make(chan bool) + + var wg sync.WaitGroup + + wg.Add(1) + // start the listener + go testAccept(t, tunB, wait, &wg) + + wg.Add(1) + // start the client + go testSend(t, tunA, wait, &wg) + + // wait until done + wg.Wait() + + for _, link := range tunA.Links() { + t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate()) + } + + for _, link := range tunB.Links() { + t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate()) + } +}