From 5ac58651542503847cae9aba50a97925b1fa5847 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 23 Oct 2019 10:55:53 +0100 Subject: [PATCH 1/4] add comment --- tunnel/link.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tunnel/link.go b/tunnel/link.go index d5c044b9..909ad489 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -173,7 +173,7 @@ func (l *link) Send(m *transport.Message) error { // rate of send in bits per nanosecond rate := float64(bits) / float64(delta.Nanoseconds()) - // + // default the rate if its zero if l.rate == 0 { // rate per second l.rate = rate * 1e9 From 842fc0156869a5210732c8e05f911a5558457a19 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 23 Oct 2019 16:05:21 +0100 Subject: [PATCH 2/4] add send/recv queues for link --- tunnel/default.go | 2 +- tunnel/link.go | 228 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 170 insertions(+), 60 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 6cc48d3c..448fa3f1 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -427,7 +427,7 @@ func (t *tun) listen(link *link) { // process anything via the net interface msg := new(transport.Message) if err := link.Recv(msg); err != nil { - log.Debugf("Tunnel link %s receive error: %#v", link.Remote(), err) + log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err) return } diff --git a/tunnel/link.go b/tunnel/link.go index 909ad489..fd1e3ad9 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -1,6 +1,7 @@ package tunnel import ( + "io" "sync" "time" @@ -14,6 +15,10 @@ type link struct { sync.RWMutex // stops the link closed chan bool + // send queue + sendQueue chan *packet + // receive queue + recvQueue chan *packet // unique id of this link e.g uuid // which we define for ourselves id string @@ -31,25 +36,37 @@ type link struct { lastKeepAlive time.Time // channels keeps a mapping of channels and last seen channels map[string]time.Time - // the weighted moving average roundtrip - rtt int64 - + length int64 // weighted moving average of bits flowing rate float64 - // keep an error count on the link errCount int } +// packet send over link +type packet struct { + // message to send or received + message *transport.Message + + // status returned when sent + status chan error + + // receive related error + err error +} + func newLink(s transport.Socket) *link { l := &link{ Socket: s, id: uuid.New().String(), - channels: make(map[string]time.Time), - closed: make(chan bool), lastKeepAlive: time.Now(), + closed: make(chan bool), + channels: make(map[string]time.Time), + sendQueue: make(chan *packet, 128), + recvQueue: make(chan *packet, 128), } + go l.process() go l.expiry() return l } @@ -58,15 +75,51 @@ func (l *link) setRTT(d time.Duration) { l.Lock() defer l.Unlock() - if l.rtt <= 0 { - l.rtt = d.Nanoseconds() + if l.length <= 0 { + l.length = d.Nanoseconds() return } // https://fishi.devtail.io/weblog/2015/04/12/measuring-bandwidth-and-round-trip-time-tcp-connection-inside-application-layer/ - rtt := 0.8*float64(l.rtt) + 0.2*float64(d.Nanoseconds()) - // set new rtt - l.rtt = int64(rtt) + length := 0.8*float64(l.length) + 0.2*float64(d.Nanoseconds()) + // set new length + l.length = int64(length) +} + +// process deals with the send queue +func (l *link) process() { + // receive messages + go func() { + for { + m := new(transport.Message) + err := l.recv(m) + if err != nil { + l.Lock() + l.errCount++ + l.Unlock() + } + + // process new received message + + select { + case l.recvQueue <- &packet{message: m, err: err}: + case <-l.closed: + return + } + } + }() + + // send messages + + for { + select { + case pk := <-l.sendQueue: + // send the message + pk.status <- l.send(pk.message) + case <-l.closed: + return + } + } } // watches the channel expiry @@ -106,46 +159,7 @@ func (l *link) expiry() { } } -// Delay is the current load on the link -func (l *link) Delay() int64 { - return 0 -} - -// Current transfer rate as bits per second (lower is better) -func (l *link) Rate() float64 { - l.RLock() - defer l.RUnlock() - return l.rate -} - -// Length returns the roundtrip time as nanoseconds (lower is better). -// Returns 0 where no measurement has been taken. -func (l *link) Length() int64 { - l.RLock() - defer l.RUnlock() - - return l.rtt -} - -func (l *link) Id() string { - l.RLock() - defer l.RUnlock() - - return l.id -} - -func (l *link) Close() error { - select { - case <-l.closed: - return nil - default: - close(l.closed) - } - - return nil -} - -func (l *link) Send(m *transport.Message) error { +func (l *link) send(m *transport.Message) error { dataSent := len(m.Body) // set header length @@ -162,6 +176,15 @@ func (l *link) Send(m *transport.Message) error { l.Lock() defer l.Unlock() + // there's an error increment the counter and bail + if err != nil { + l.errCount++ + return err + } + + // reset the counter + l.errCount = 0 + // calculate based on data if dataSent > 0 { // measure time taken @@ -183,17 +206,104 @@ func (l *link) Send(m *transport.Message) error { } } - // if theres no error reset the counter - if err == nil { - l.errCount = 0 - } - - // otherwise increment the counter - l.errCount++ - return err } +// recv a message on the link +func (l *link) recv(m *transport.Message) error { + if m.Header == nil { + m.Header = make(map[string]string) + } + + // TODO measure the receive latency if possible + + // receive the transport message + return l.Socket.Recv(m) +} + +// Delay is the current load on the link +func (l *link) Delay() int64 { + return int64(len(l.sendQueue) + len(l.recvQueue)) +} + +// Current transfer rate as bits per second (lower is better) +func (l *link) Rate() float64 { + l.RLock() + defer l.RUnlock() + + return l.rate +} + +// Length returns the roundtrip time as nanoseconds (lower is better). +// Returns 0 where no measurement has been taken. +func (l *link) Length() int64 { + l.RLock() + defer l.RUnlock() + + return l.length +} + +func (l *link) Id() string { + l.RLock() + defer l.RUnlock() + + return l.id +} + +func (l *link) Close() error { + select { + case <-l.closed: + return nil + default: + close(l.closed) + } + + return nil +} + +// Send sencs a message on the link +func (l *link) Send(m *transport.Message) error { + // create a new packet to send over the link + p := &packet{ + message: m, + status: make(chan error, 1), + } + + select { + case <-l.closed: + return io.EOF + case l.sendQueue <- p: + // in the send queue + } + + // wait for response + select { + case <-l.closed: + return io.EOF + case err := <-p.status: + return err + } + + // never reached + return nil +} + +// Accept accepts a message on the socket +func (l *link) Recv(m *transport.Message) error { + select { + case <-l.closed: + return io.EOF + case pk := <-l.recvQueue: + // check the packet receive error + if pk.err != nil { + return pk.err + } + *m = *pk.message + } + return nil +} + +// Status can return connected, closed, error func (l *link) Status() string { select { case <-l.closed: From fe180148a1ffa74e11cc310e3eef3a793053a673 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 23 Oct 2019 16:15:39 +0100 Subject: [PATCH 3/4] rearrange where we account for errors and data sent --- tunnel/link.go | 101 +++++++++++++++++++++++++------------------------ 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/tunnel/link.go b/tunnel/link.go index fd1e3ad9..06871a13 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -160,53 +160,11 @@ func (l *link) expiry() { } func (l *link) send(m *transport.Message) error { - dataSent := len(m.Body) - - // set header length - for k, v := range m.Header { - dataSent += (len(k) + len(v)) + if m.Header == nil { + m.Header = make(map[string]string) } - - // get time now - now := time.Now() - // send the message - err := l.Socket.Send(m) - - l.Lock() - defer l.Unlock() - - // there's an error increment the counter and bail - if err != nil { - l.errCount++ - return err - } - - // reset the counter - l.errCount = 0 - - // calculate based on data - if dataSent > 0 { - // measure time taken - delta := time.Since(now) - - // bit sent - bits := dataSent * 1024 - - // rate of send in bits per nanosecond - rate := float64(bits) / float64(delta.Nanoseconds()) - - // default the rate if its zero - if l.rate == 0 { - // rate per second - l.rate = rate * 1e9 - } else { - // set new rate per second - l.rate = 0.8*l.rate + 0.2*(rate*1e9) - } - } - - return err + return l.Socket.Send(m) } // recv a message on the link @@ -214,9 +172,6 @@ func (l *link) recv(m *transport.Message) error { if m.Header == nil { m.Header = make(map[string]string) } - - // TODO measure the receive latency if possible - // receive the transport message return l.Socket.Recv(m) } @@ -269,6 +224,10 @@ func (l *link) Send(m *transport.Message) error { status: make(chan error, 1), } + // get time now + now := time.Now() + + // queue the message select { case <-l.closed: return io.EOF @@ -276,15 +235,57 @@ func (l *link) Send(m *transport.Message) error { // in the send queue } + // error to use + var err error + // wait for response select { case <-l.closed: return io.EOF - case err := <-p.status: + case err = <-p.status: + } + + l.Lock() + defer l.Unlock() + + // there's an error increment the counter and bail + if err != nil { + l.errCount++ return err } - // never reached + // reset the counter + l.errCount = 0 + + // calculate the data sent + dataSent := len(m.Body) + + // set header length + for k, v := range m.Header { + dataSent += (len(k) + len(v)) + } + + // calculate based on data + if dataSent > 0 { + // measure time taken + delta := time.Since(now) + + // bit sent + bits := dataSent * 1024 + + // rate of send in bits per nanosecond + rate := float64(bits) / float64(delta.Nanoseconds()) + + // default the rate if its zero + if l.rate == 0 { + // rate per second + l.rate = rate * 1e9 + } else { + // set new rate per second + l.rate = 0.8*l.rate + 0.2*(rate*1e9) + } + } + return nil } From bf4a73d5c0c3fffbbe81db7b0e0299cbd3d637f9 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 23 Oct 2019 16:39:26 +0100 Subject: [PATCH 4/4] Close the socket in the link --- tunnel/link.go | 22 ++++++++++++++++++++-- tunnel/tunnel_test.go | 8 ++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/tunnel/link.go b/tunnel/link.go index 06871a13..036a4830 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -210,6 +210,7 @@ func (l *link) Close() error { case <-l.closed: return nil default: + l.Socket.Close() close(l.closed) } @@ -227,12 +228,19 @@ func (l *link) Send(m *transport.Message) error { // get time now now := time.Now() - // queue the message + // check if its closed first select { case <-l.closed: return io.EOF + default: + } + + // queue the message + select { case l.sendQueue <- p: // in the send queue + case <-l.closed: + return io.EOF } // error to use @@ -293,7 +301,17 @@ func (l *link) Send(m *transport.Message) error { func (l *link) Recv(m *transport.Message) error { select { case <-l.closed: - return io.EOF + // check if there's any messages left + select { + case pk := <-l.recvQueue: + // check the packet receive error + if pk.err != nil { + return pk.err + } + *m = *pk.message + default: + return io.EOF + } case pk := <-l.recvQueue: // check the packet receive error if pk.err != nil { diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index a06a1b01..e884e692 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -202,8 +202,8 @@ func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.Wait t.Fatal(err) } - // notify sender we have received the message - <-wait + // notify the sender we have received + wait <- true } func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { @@ -234,7 +234,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr <-wait // give it time to reconnect - time.Sleep(2 * ReconnectTime) + time.Sleep(5 * ReconnectTime) // send the message if err := c.Send(&m); err != nil { @@ -244,7 +244,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr // wait for the listener to receive the message // c.Send merely enqueues the message to the link send queue and returns // in order to verify it was received we wait for the listener to tell us - wait <- true + <-wait } func TestReconnectTunnel(t *testing.T) {