diff --git a/tunnel/default.go b/tunnel/default.go index 6cc48d3c..448fa3f1 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -427,7 +427,7 @@ func (t *tun) listen(link *link) { // process anything via the net interface msg := new(transport.Message) if err := link.Recv(msg); err != nil { - log.Debugf("Tunnel link %s receive error: %#v", link.Remote(), err) + log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err) return } diff --git a/tunnel/link.go b/tunnel/link.go index d5c044b9..036a4830 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -1,6 +1,7 @@ package tunnel import ( + "io" "sync" "time" @@ -14,6 +15,10 @@ type link struct { sync.RWMutex // stops the link closed chan bool + // send queue + sendQueue chan *packet + // receive queue + recvQueue chan *packet // unique id of this link e.g uuid // which we define for ourselves id string @@ -31,25 +36,37 @@ type link struct { lastKeepAlive time.Time // channels keeps a mapping of channels and last seen channels map[string]time.Time - // the weighted moving average roundtrip - rtt int64 - + length int64 // weighted moving average of bits flowing rate float64 - // keep an error count on the link errCount int } +// packet send over link +type packet struct { + // message to send or received + message *transport.Message + + // status returned when sent + status chan error + + // receive related error + err error +} + func newLink(s transport.Socket) *link { l := &link{ Socket: s, id: uuid.New().String(), - channels: make(map[string]time.Time), - closed: make(chan bool), lastKeepAlive: time.Now(), + closed: make(chan bool), + channels: make(map[string]time.Time), + sendQueue: make(chan *packet, 128), + recvQueue: make(chan *packet, 128), } + go l.process() go l.expiry() return l } @@ -58,15 +75,51 @@ func (l *link) setRTT(d time.Duration) { l.Lock() defer l.Unlock() - if l.rtt <= 0 { - l.rtt = d.Nanoseconds() + if l.length <= 0 { + l.length = d.Nanoseconds() return } // https://fishi.devtail.io/weblog/2015/04/12/measuring-bandwidth-and-round-trip-time-tcp-connection-inside-application-layer/ - rtt := 0.8*float64(l.rtt) + 0.2*float64(d.Nanoseconds()) - // set new rtt - l.rtt = int64(rtt) + length := 0.8*float64(l.length) + 0.2*float64(d.Nanoseconds()) + // set new length + l.length = int64(length) +} + +// process deals with the send queue +func (l *link) process() { + // receive messages + go func() { + for { + m := new(transport.Message) + err := l.recv(m) + if err != nil { + l.Lock() + l.errCount++ + l.Unlock() + } + + // process new received message + + select { + case l.recvQueue <- &packet{message: m, err: err}: + case <-l.closed: + return + } + } + }() + + // send messages + + for { + select { + case pk := <-l.sendQueue: + // send the message + pk.status <- l.send(pk.message) + case <-l.closed: + return + } + } } // watches the channel expiry @@ -106,15 +159,33 @@ func (l *link) expiry() { } } +func (l *link) send(m *transport.Message) error { + if m.Header == nil { + m.Header = make(map[string]string) + } + // send the message + return l.Socket.Send(m) +} + +// recv a message on the link +func (l *link) recv(m *transport.Message) error { + if m.Header == nil { + m.Header = make(map[string]string) + } + // receive the transport message + return l.Socket.Recv(m) +} + // Delay is the current load on the link func (l *link) Delay() int64 { - return 0 + return int64(len(l.sendQueue) + len(l.recvQueue)) } // Current transfer rate as bits per second (lower is better) func (l *link) Rate() float64 { l.RLock() defer l.RUnlock() + return l.rate } @@ -124,7 +195,7 @@ func (l *link) Length() int64 { l.RLock() defer l.RUnlock() - return l.rtt + return l.length } func (l *link) Id() string { @@ -139,13 +210,62 @@ func (l *link) Close() error { case <-l.closed: return nil default: + l.Socket.Close() close(l.closed) } return nil } +// Send sencs a message on the link func (l *link) Send(m *transport.Message) error { + // create a new packet to send over the link + p := &packet{ + message: m, + status: make(chan error, 1), + } + + // get time now + now := time.Now() + + // check if its closed first + select { + case <-l.closed: + return io.EOF + default: + } + + // queue the message + select { + case l.sendQueue <- p: + // in the send queue + case <-l.closed: + return io.EOF + } + + // error to use + var err error + + // wait for response + select { + case <-l.closed: + return io.EOF + case err = <-p.status: + } + + l.Lock() + defer l.Unlock() + + // there's an error increment the counter and bail + if err != nil { + l.errCount++ + return err + } + + // reset the counter + l.errCount = 0 + + // calculate the data sent dataSent := len(m.Body) // set header length @@ -153,15 +273,6 @@ func (l *link) Send(m *transport.Message) error { dataSent += (len(k) + len(v)) } - // get time now - now := time.Now() - - // send the message - err := l.Socket.Send(m) - - l.Lock() - defer l.Unlock() - // calculate based on data if dataSent > 0 { // measure time taken @@ -173,7 +284,7 @@ func (l *link) Send(m *transport.Message) error { // rate of send in bits per nanosecond rate := float64(bits) / float64(delta.Nanoseconds()) - // + // default the rate if its zero if l.rate == 0 { // rate per second l.rate = rate * 1e9 @@ -183,17 +294,35 @@ func (l *link) Send(m *transport.Message) error { } } - // if theres no error reset the counter - if err == nil { - l.errCount = 0 - } - - // otherwise increment the counter - l.errCount++ - - return err + return nil } +// Accept accepts a message on the socket +func (l *link) Recv(m *transport.Message) error { + select { + case <-l.closed: + // check if there's any messages left + select { + case pk := <-l.recvQueue: + // check the packet receive error + if pk.err != nil { + return pk.err + } + *m = *pk.message + default: + return io.EOF + } + case pk := <-l.recvQueue: + // check the packet receive error + if pk.err != nil { + return pk.err + } + *m = *pk.message + } + return nil +} + +// Status can return connected, closed, error func (l *link) Status() string { select { case <-l.closed: diff --git a/tunnel/tunnel_test.go b/tunnel/tunnel_test.go index a06a1b01..e884e692 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/tunnel_test.go @@ -202,8 +202,8 @@ func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.Wait t.Fatal(err) } - // notify sender we have received the message - <-wait + // notify the sender we have received + wait <- true } func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { @@ -234,7 +234,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr <-wait // give it time to reconnect - time.Sleep(2 * ReconnectTime) + time.Sleep(5 * ReconnectTime) // send the message if err := c.Send(&m); err != nil { @@ -244,7 +244,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr // wait for the listener to receive the message // c.Send merely enqueues the message to the link send queue and returns // in order to verify it was received we wait for the listener to tell us - wait <- true + <-wait } func TestReconnectTunnel(t *testing.T) {