From 27af221fd249a28fb2865d701f611b88bbe1e10f Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 11 Dec 2019 15:23:08 +0000 Subject: [PATCH] batch metric updates --- tunnel/default.go | 1 - tunnel/link.go | 131 +++++++++++++++++++++++++++++++++------------- 2 files changed, 96 insertions(+), 36 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 8abad16b..ff66ea6f 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -281,7 +281,6 @@ func (t *tun) manageLinks() { connect = append(connect, node) } - // delete the dead links if len(delLinks) > 0 { t.Lock() diff --git a/tunnel/link.go b/tunnel/link.go index fe6ba83a..0f5b3980 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -22,6 +22,8 @@ type link struct { // stops the link closed chan bool + // metric used to track metrics + metric chan *metric // link state channel for testing link state chan *packet // send queue for sending packets @@ -65,6 +67,16 @@ type packet struct { err error } +// metric is used to record link rate +type metric struct { + // amount of data sent + data int + // time taken to send + duration time.Duration + // if an error occurred + status error +} + var ( // the 4 byte 0 packet sent to determine the link state linkRequest = []byte{0, 0, 0, 0} @@ -84,6 +96,7 @@ func newLink(s transport.Socket) *link { state: make(chan *packet, 64), sendQueue: make(chan *packet, 128), recvQueue: make(chan *packet, 128), + metric: make(chan *metric, 128), } // process inbound/outbound packets @@ -189,9 +202,11 @@ func (l *link) process() { m := new(transport.Message) err := l.recv(m) if err != nil { - l.Lock() - l.errCount++ - l.Unlock() + // record the metric + select { + case l.metric <- &metric{status: err}: + default: + } } // process new received message @@ -240,8 +255,12 @@ func (l *link) process() { // manage manages the link state including rtt packets and channel mapping expiry func (l *link) manage() { // tick over every minute to expire and fire rtt packets - t := time.NewTicker(time.Minute) - defer t.Stop() + t1 := time.NewTicker(time.Minute) + defer t1.Stop() + + // used to batch update link metrics + t2 := time.NewTicker(time.Second * 5) + defer t2.Stop() // get link id linkId := l.Id() @@ -290,7 +309,7 @@ func (l *link) manage() { // set the RTT l.setRTT(d) } - case <-t.C: + case <-t1.C: // drop any channel mappings older than 2 minutes var kill []string killTime := time.Minute * 2 @@ -318,10 +337,60 @@ func (l *link) manage() { // fire off a link state rtt packet now = time.Now() send(linkRequest) + case <-t2.C: + // get a batch of metrics + batch := l.batch() + + // skip if there's no metrics + if len(batch) == 0 { + continue + } + + // lock once to record a batch + l.Lock() + for _, metric := range batch { + l.record(metric) + } + l.Unlock() } } } +func (l *link) batch() []*metric { + var metrics []*metric + + // pull all the metrics + for { + select { + case m := <-l.metric: + metrics = append(metrics, m) + // non blocking return + default: + return metrics + } + } +} + +func (l *link) record(m *metric) { + // there's an error increment the counter and bail + if m.status != nil { + l.errCount++ + return + } + + // reset the counter + l.errCount = 0 + + // calculate based on data + if m.data > 0 { + // bit sent + bits := m.data * 1024 + + // set the rate + l.setRate(int64(bits), m.duration) + } +} + func (l *link) send(m *transport.Message) error { if m.Header == nil { m.Header = make(map[string]string) @@ -372,7 +441,7 @@ func (l *link) Id() string { l.RLock() id := l.id l.RUnlock() - return l.id + return id } func (l *link) Close() error { @@ -398,6 +467,14 @@ func (l *link) Send(m *transport.Message) error { status: make(chan error, 1), } + // calculate the data sent + dataSent := len(m.Body) + + // set header length + for k, v := range m.Header { + dataSent += (len(k) + len(v)) + } + // get time now now := time.Now() @@ -419,37 +496,21 @@ func (l *link) Send(m *transport.Message) error { case err = <-p.status: } - l.Lock() - - // there's an error increment the counter and bail - if err != nil { - l.errCount++ - l.Unlock() - return err + // create a metric with + // time taken, size of package, error status + mt := &metric{ + data: dataSent, + duration: time.Since(now), + status: err, } - // reset the counter - l.errCount = 0 - - // calculate the data sent - dataSent := len(m.Body) - - // set header length - for k, v := range m.Header { - dataSent += (len(k) + len(v)) + // pass back a metric + // do not block + select { + case l.metric <- mt: + default: } - // calculate based on data - if dataSent > 0 { - // bit sent - bits := dataSent * 1024 - - // set the rate - l.setRate(int64(bits), time.Since(now)) - } - - l.Unlock() - return nil } @@ -488,7 +549,7 @@ func (l *link) State() string { errCount := l.errCount l.RUnlock() - if lerrCount > 3 { + if errCount > 3 { return "error" }