batch metric updates

This commit is contained in:
Asim Aslam 2019-12-11 15:23:08 +00:00
parent 6e28e7a86f
commit 27af221fd2
2 changed files with 96 additions and 36 deletions

View File

@ -281,7 +281,6 @@ func (t *tun) manageLinks() {
connect = append(connect, node) connect = append(connect, node)
} }
// delete the dead links // delete the dead links
if len(delLinks) > 0 { if len(delLinks) > 0 {
t.Lock() t.Lock()

View File

@ -22,6 +22,8 @@ type link struct {
// stops the link // stops the link
closed chan bool closed chan bool
// metric used to track metrics
metric chan *metric
// link state channel for testing link // link state channel for testing link
state chan *packet state chan *packet
// send queue for sending packets // send queue for sending packets
@ -65,6 +67,16 @@ type packet struct {
err error err error
} }
// metric is used to record link rate
type metric struct {
// amount of data sent
data int
// time taken to send
duration time.Duration
// if an error occurred
status error
}
var ( var (
// the 4 byte 0 packet sent to determine the link state // the 4 byte 0 packet sent to determine the link state
linkRequest = []byte{0, 0, 0, 0} linkRequest = []byte{0, 0, 0, 0}
@ -84,6 +96,7 @@ func newLink(s transport.Socket) *link {
state: make(chan *packet, 64), state: make(chan *packet, 64),
sendQueue: make(chan *packet, 128), sendQueue: make(chan *packet, 128),
recvQueue: make(chan *packet, 128), recvQueue: make(chan *packet, 128),
metric: make(chan *metric, 128),
} }
// process inbound/outbound packets // process inbound/outbound packets
@ -189,9 +202,11 @@ func (l *link) process() {
m := new(transport.Message) m := new(transport.Message)
err := l.recv(m) err := l.recv(m)
if err != nil { if err != nil {
l.Lock() // record the metric
l.errCount++ select {
l.Unlock() case l.metric <- &metric{status: err}:
default:
}
} }
// process new received message // process new received message
@ -240,8 +255,12 @@ func (l *link) process() {
// manage manages the link state including rtt packets and channel mapping expiry // manage manages the link state including rtt packets and channel mapping expiry
func (l *link) manage() { func (l *link) manage() {
// tick over every minute to expire and fire rtt packets // tick over every minute to expire and fire rtt packets
t := time.NewTicker(time.Minute) t1 := time.NewTicker(time.Minute)
defer t.Stop() defer t1.Stop()
// used to batch update link metrics
t2 := time.NewTicker(time.Second * 5)
defer t2.Stop()
// get link id // get link id
linkId := l.Id() linkId := l.Id()
@ -290,7 +309,7 @@ func (l *link) manage() {
// set the RTT // set the RTT
l.setRTT(d) l.setRTT(d)
} }
case <-t.C: case <-t1.C:
// drop any channel mappings older than 2 minutes // drop any channel mappings older than 2 minutes
var kill []string var kill []string
killTime := time.Minute * 2 killTime := time.Minute * 2
@ -318,7 +337,57 @@ func (l *link) manage() {
// fire off a link state rtt packet // fire off a link state rtt packet
now = time.Now() now = time.Now()
send(linkRequest) send(linkRequest)
case <-t2.C:
// get a batch of metrics
batch := l.batch()
// skip if there's no metrics
if len(batch) == 0 {
continue
} }
// lock once to record a batch
l.Lock()
for _, metric := range batch {
l.record(metric)
}
l.Unlock()
}
}
}
func (l *link) batch() []*metric {
var metrics []*metric
// pull all the metrics
for {
select {
case m := <-l.metric:
metrics = append(metrics, m)
// non blocking return
default:
return metrics
}
}
}
func (l *link) record(m *metric) {
// there's an error increment the counter and bail
if m.status != nil {
l.errCount++
return
}
// reset the counter
l.errCount = 0
// calculate based on data
if m.data > 0 {
// bit sent
bits := m.data * 1024
// set the rate
l.setRate(int64(bits), m.duration)
} }
} }
@ -372,7 +441,7 @@ func (l *link) Id() string {
l.RLock() l.RLock()
id := l.id id := l.id
l.RUnlock() l.RUnlock()
return l.id return id
} }
func (l *link) Close() error { func (l *link) Close() error {
@ -398,6 +467,14 @@ func (l *link) Send(m *transport.Message) error {
status: make(chan error, 1), status: make(chan error, 1),
} }
// calculate the data sent
dataSent := len(m.Body)
// set header length
for k, v := range m.Header {
dataSent += (len(k) + len(v))
}
// get time now // get time now
now := time.Now() now := time.Now()
@ -419,37 +496,21 @@ func (l *link) Send(m *transport.Message) error {
case err = <-p.status: case err = <-p.status:
} }
l.Lock() // create a metric with
// time taken, size of package, error status
// there's an error increment the counter and bail mt := &metric{
if err != nil { data: dataSent,
l.errCount++ duration: time.Since(now),
l.Unlock() status: err,
return err
} }
// reset the counter // pass back a metric
l.errCount = 0 // do not block
select {
// calculate the data sent case l.metric <- mt:
dataSent := len(m.Body) default:
// set header length
for k, v := range m.Header {
dataSent += (len(k) + len(v))
} }
// calculate based on data
if dataSent > 0 {
// bit sent
bits := dataSent * 1024
// set the rate
l.setRate(int64(bits), time.Since(now))
}
l.Unlock()
return nil return nil
} }
@ -488,7 +549,7 @@ func (l *link) State() string {
errCount := l.errCount errCount := l.errCount
l.RUnlock() l.RUnlock()
if lerrCount > 3 { if errCount > 3 {
return "error" return "error"
} }