From 842fc0156869a5210732c8e05f911a5558457a19 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 23 Oct 2019 16:05:21 +0100 Subject: [PATCH] add send/recv queues for link --- tunnel/default.go | 2 +- tunnel/link.go | 228 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 170 insertions(+), 60 deletions(-) diff --git a/tunnel/default.go b/tunnel/default.go index 6cc48d3c..448fa3f1 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -427,7 +427,7 @@ func (t *tun) listen(link *link) { // process anything via the net interface msg := new(transport.Message) if err := link.Recv(msg); err != nil { - log.Debugf("Tunnel link %s receive error: %#v", link.Remote(), err) + log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err) return } diff --git a/tunnel/link.go b/tunnel/link.go index 909ad489..fd1e3ad9 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -1,6 +1,7 @@ package tunnel import ( + "io" "sync" "time" @@ -14,6 +15,10 @@ type link struct { sync.RWMutex // stops the link closed chan bool + // send queue + sendQueue chan *packet + // receive queue + recvQueue chan *packet // unique id of this link e.g uuid // which we define for ourselves id string @@ -31,25 +36,37 @@ type link struct { lastKeepAlive time.Time // channels keeps a mapping of channels and last seen channels map[string]time.Time - // the weighted moving average roundtrip - rtt int64 - + length int64 // weighted moving average of bits flowing rate float64 - // keep an error count on the link errCount int } +// packet send over link +type packet struct { + // message to send or received + message *transport.Message + + // status returned when sent + status chan error + + // receive related error + err error +} + func newLink(s transport.Socket) *link { l := &link{ Socket: s, id: uuid.New().String(), - channels: make(map[string]time.Time), - closed: make(chan bool), lastKeepAlive: time.Now(), + closed: make(chan bool), + channels: make(map[string]time.Time), + sendQueue: make(chan *packet, 128), + recvQueue: make(chan *packet, 128), } + go l.process() go l.expiry() return l } @@ -58,15 +75,51 @@ func (l *link) setRTT(d time.Duration) { l.Lock() defer l.Unlock() - if l.rtt <= 0 { - l.rtt = d.Nanoseconds() + if l.length <= 0 { + l.length = d.Nanoseconds() return } // https://fishi.devtail.io/weblog/2015/04/12/measuring-bandwidth-and-round-trip-time-tcp-connection-inside-application-layer/ - rtt := 0.8*float64(l.rtt) + 0.2*float64(d.Nanoseconds()) - // set new rtt - l.rtt = int64(rtt) + length := 0.8*float64(l.length) + 0.2*float64(d.Nanoseconds()) + // set new length + l.length = int64(length) +} + +// process deals with the send queue +func (l *link) process() { + // receive messages + go func() { + for { + m := new(transport.Message) + err := l.recv(m) + if err != nil { + l.Lock() + l.errCount++ + l.Unlock() + } + + // process new received message + + select { + case l.recvQueue <- &packet{message: m, err: err}: + case <-l.closed: + return + } + } + }() + + // send messages + + for { + select { + case pk := <-l.sendQueue: + // send the message + pk.status <- l.send(pk.message) + case <-l.closed: + return + } + } } // watches the channel expiry @@ -106,46 +159,7 @@ func (l *link) expiry() { } } -// Delay is the current load on the link -func (l *link) Delay() int64 { - return 0 -} - -// Current transfer rate as bits per second (lower is better) -func (l *link) Rate() float64 { - l.RLock() - defer l.RUnlock() - return l.rate -} - -// Length returns the roundtrip time as nanoseconds (lower is better). -// Returns 0 where no measurement has been taken. -func (l *link) Length() int64 { - l.RLock() - defer l.RUnlock() - - return l.rtt -} - -func (l *link) Id() string { - l.RLock() - defer l.RUnlock() - - return l.id -} - -func (l *link) Close() error { - select { - case <-l.closed: - return nil - default: - close(l.closed) - } - - return nil -} - -func (l *link) Send(m *transport.Message) error { +func (l *link) send(m *transport.Message) error { dataSent := len(m.Body) // set header length @@ -162,6 +176,15 @@ func (l *link) Send(m *transport.Message) error { l.Lock() defer l.Unlock() + // there's an error increment the counter and bail + if err != nil { + l.errCount++ + return err + } + + // reset the counter + l.errCount = 0 + // calculate based on data if dataSent > 0 { // measure time taken @@ -183,17 +206,104 @@ func (l *link) Send(m *transport.Message) error { } } - // if theres no error reset the counter - if err == nil { - l.errCount = 0 - } - - // otherwise increment the counter - l.errCount++ - return err } +// recv a message on the link +func (l *link) recv(m *transport.Message) error { + if m.Header == nil { + m.Header = make(map[string]string) + } + + // TODO measure the receive latency if possible + + // receive the transport message + return l.Socket.Recv(m) +} + +// Delay is the current load on the link +func (l *link) Delay() int64 { + return int64(len(l.sendQueue) + len(l.recvQueue)) +} + +// Current transfer rate as bits per second (lower is better) +func (l *link) Rate() float64 { + l.RLock() + defer l.RUnlock() + + return l.rate +} + +// Length returns the roundtrip time as nanoseconds (lower is better). +// Returns 0 where no measurement has been taken. +func (l *link) Length() int64 { + l.RLock() + defer l.RUnlock() + + return l.length +} + +func (l *link) Id() string { + l.RLock() + defer l.RUnlock() + + return l.id +} + +func (l *link) Close() error { + select { + case <-l.closed: + return nil + default: + close(l.closed) + } + + return nil +} + +// Send sencs a message on the link +func (l *link) Send(m *transport.Message) error { + // create a new packet to send over the link + p := &packet{ + message: m, + status: make(chan error, 1), + } + + select { + case <-l.closed: + return io.EOF + case l.sendQueue <- p: + // in the send queue + } + + // wait for response + select { + case <-l.closed: + return io.EOF + case err := <-p.status: + return err + } + + // never reached + return nil +} + +// Accept accepts a message on the socket +func (l *link) Recv(m *transport.Message) error { + select { + case <-l.closed: + return io.EOF + case pk := <-l.recvQueue: + // check the packet receive error + if pk.err != nil { + return pk.err + } + *m = *pk.message + } + return nil +} + +// Status can return connected, closed, error func (l *link) Status() string { select { case <-l.closed: