From fe180148a1ffa74e11cc310e3eef3a793053a673 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 23 Oct 2019 16:15:39 +0100 Subject: [PATCH] rearrange where we account for errors and data sent --- tunnel/link.go | 101 +++++++++++++++++++++++++------------------------ 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/tunnel/link.go b/tunnel/link.go index fd1e3ad9..06871a13 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -160,53 +160,11 @@ func (l *link) expiry() { } func (l *link) send(m *transport.Message) error { - dataSent := len(m.Body) - - // set header length - for k, v := range m.Header { - dataSent += (len(k) + len(v)) + if m.Header == nil { + m.Header = make(map[string]string) } - - // get time now - now := time.Now() - // send the message - err := l.Socket.Send(m) - - l.Lock() - defer l.Unlock() - - // there's an error increment the counter and bail - if err != nil { - l.errCount++ - return err - } - - // reset the counter - l.errCount = 0 - - // calculate based on data - if dataSent > 0 { - // measure time taken - delta := time.Since(now) - - // bit sent - bits := dataSent * 1024 - - // rate of send in bits per nanosecond - rate := float64(bits) / float64(delta.Nanoseconds()) - - // default the rate if its zero - if l.rate == 0 { - // rate per second - l.rate = rate * 1e9 - } else { - // set new rate per second - l.rate = 0.8*l.rate + 0.2*(rate*1e9) - } - } - - return err + return l.Socket.Send(m) } // recv a message on the link @@ -214,9 +172,6 @@ func (l *link) recv(m *transport.Message) error { if m.Header == nil { m.Header = make(map[string]string) } - - // TODO measure the receive latency if possible - // receive the transport message return l.Socket.Recv(m) } @@ -269,6 +224,10 @@ func (l *link) Send(m *transport.Message) error { status: make(chan error, 1), } + // get time now + now := time.Now() + + // queue the message select { case <-l.closed: return io.EOF @@ -276,15 +235,57 @@ func (l *link) Send(m *transport.Message) error { // in the send queue } + // error to use + var err error + // wait for response select { case <-l.closed: return io.EOF - case err := <-p.status: + case err = <-p.status: + } + + l.Lock() + defer l.Unlock() + + // there's an error increment the counter and bail + if err != nil { + l.errCount++ return err } - // never reached + // reset the counter + l.errCount = 0 + + // calculate the data sent + dataSent := len(m.Body) + + // set header length + for k, v := range m.Header { + dataSent += (len(k) + len(v)) + } + + // calculate based on data + if dataSent > 0 { + // measure time taken + delta := time.Since(now) + + // bit sent + bits := dataSent * 1024 + + // rate of send in bits per nanosecond + rate := float64(bits) / float64(delta.Nanoseconds()) + + // default the rate if its zero + if l.rate == 0 { + // rate per second + l.rate = rate * 1e9 + } else { + // set new rate per second + l.rate = 0.8*l.rate + 0.2*(rate*1e9) + } + } + return nil }