add rate measure

This commit is contained in:
Asim Aslam 2019-10-22 19:38:29 +01:00
parent 407694232a
commit d64f8c665e
3 changed files with 93 additions and 7 deletions

View File

@ -1085,17 +1085,16 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// set the link time // set the link time
t.RLock() t.RLock()
link, ok := t.links[msg.link] link, ok := t.links[c.link]
t.RUnlock() t.RUnlock()
if ok { if ok {
// set the rountrip time // set the rountrip time
link.setRTT(d) link.setRTT(d)
// set measured to true
measured = true
} }
// set measured to true
measured = true
// set discovered to true // set discovered to true
c.discovered = true c.discovered = true
} }

View File

@ -32,9 +32,12 @@ type link struct {
// channels keeps a mapping of channels and last seen // channels keeps a mapping of channels and last seen
channels map[string]time.Time channels map[string]time.Time
// the weighed moving average roundtrip // the weighted moving average roundtrip
rtt int64 rtt int64
// weighted moving average of bits flowing
rate float64
// keep an error count on the link // keep an error count on the link
errCount int errCount int
} }
@ -55,7 +58,7 @@ func (l *link) setRTT(d time.Duration) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
if l.rtt < 0 { if l.rtt <= 0 {
l.rtt = d.Nanoseconds() l.rtt = d.Nanoseconds()
return return
} }
@ -110,7 +113,9 @@ func (l *link) Delay() int64 {
// Current transfer rate as bits per second (lower is better) // Current transfer rate as bits per second (lower is better)
func (l *link) Rate() float64 { func (l *link) Rate() float64 {
return float64(10e8) l.RLock()
defer l.RUnlock()
return l.rate
} }
// Length returns the roundtrip time as nanoseconds (lower is better). // Length returns the roundtrip time as nanoseconds (lower is better).
@ -141,11 +146,43 @@ func (l *link) Close() error {
} }
func (l *link) Send(m *transport.Message) error { func (l *link) Send(m *transport.Message) error {
dataSent := len(m.Body)
// set header length
for k, v := range m.Header {
dataSent += (len(k) + len(v))
}
// get time now
now := time.Now()
// send the message
err := l.Socket.Send(m) err := l.Socket.Send(m)
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
// calculate based on data
if dataSent > 0 {
// measure time taken
delta := time.Since(now)
// bit sent
bits := dataSent * 1024
// rate of send in bits per nanosecond
rate := float64(bits) / float64(delta.Nanoseconds())
//
if l.rate == 0 {
// rate per second
l.rate = rate * 1e9
} else {
// set new rate per second
l.rate = 0.8*l.rate + 0.2*(rate*1e9)
}
}
// if theres no error reset the counter // if theres no error reset the counter
if err == nil { if err == nil {
l.errCount = 0 l.errCount = 0

View File

@ -292,3 +292,53 @@ func TestReconnectTunnel(t *testing.T) {
// wait until done // wait until done
wg.Wait() wg.Wait()
} }
func TestTunnelRTTRate(t *testing.T) {
// create a new tunnel client
tunA := NewTunnel(
Address("127.0.0.1:9096"),
Nodes("127.0.0.1:9097"),
)
// create a new tunnel server
tunB := NewTunnel(
Address("127.0.0.1:9097"),
)
// start tunB
err := tunB.Connect()
if err != nil {
t.Fatal(err)
}
defer tunB.Close()
// start tunA
err = tunA.Connect()
if err != nil {
t.Fatal(err)
}
defer tunA.Close()
wait := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
// start the listener
go testAccept(t, tunB, wait, &wg)
wg.Add(1)
// start the client
go testSend(t, tunA, wait, &wg)
// wait until done
wg.Wait()
for _, link := range tunA.Links() {
t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate())
}
for _, link := range tunB.Links() {
t.Logf("Link %s length %v rate %v", link.Id(), link.Length(), link.Rate())
}
}