Merge pull request #877 from micro/tun-delay

Tunnel Delay and link buffers
This commit is contained in:
Asim Aslam 2019-10-23 16:49:18 +01:00 committed by GitHub
commit caca93f65b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 166 additions and 37 deletions

View File

@ -427,7 +427,7 @@ func (t *tun) listen(link *link) {
// process anything via the net interface // process anything via the net interface
msg := new(transport.Message) msg := new(transport.Message)
if err := link.Recv(msg); err != nil { if err := link.Recv(msg); err != nil {
log.Debugf("Tunnel link %s receive error: %#v", link.Remote(), err) log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err)
return return
} }

View File

@ -1,6 +1,7 @@
package tunnel package tunnel
import ( import (
"io"
"sync" "sync"
"time" "time"
@ -14,6 +15,10 @@ type link struct {
sync.RWMutex sync.RWMutex
// stops the link // stops the link
closed chan bool closed chan bool
// send queue
sendQueue chan *packet
// receive queue
recvQueue chan *packet
// unique id of this link e.g uuid // unique id of this link e.g uuid
// which we define for ourselves // which we define for ourselves
id string id string
@ -31,25 +36,37 @@ type link struct {
lastKeepAlive time.Time lastKeepAlive time.Time
// channels keeps a mapping of channels and last seen // channels keeps a mapping of channels and last seen
channels map[string]time.Time channels map[string]time.Time
// the weighted moving average roundtrip // the weighted moving average roundtrip
rtt int64 length int64
// weighted moving average of bits flowing // weighted moving average of bits flowing
rate float64 rate float64
// keep an error count on the link // keep an error count on the link
errCount int errCount int
} }
// packet send over link
type packet struct {
// message to send or received
message *transport.Message
// status returned when sent
status chan error
// receive related error
err error
}
func newLink(s transport.Socket) *link { func newLink(s transport.Socket) *link {
l := &link{ l := &link{
Socket: s, Socket: s,
id: uuid.New().String(), id: uuid.New().String(),
channels: make(map[string]time.Time),
closed: make(chan bool),
lastKeepAlive: time.Now(), lastKeepAlive: time.Now(),
closed: make(chan bool),
channels: make(map[string]time.Time),
sendQueue: make(chan *packet, 128),
recvQueue: make(chan *packet, 128),
} }
go l.process()
go l.expiry() go l.expiry()
return l return l
} }
@ -58,15 +75,51 @@ func (l *link) setRTT(d time.Duration) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
if l.rtt <= 0 { if l.length <= 0 {
l.rtt = d.Nanoseconds() l.length = d.Nanoseconds()
return return
} }
// https://fishi.devtail.io/weblog/2015/04/12/measuring-bandwidth-and-round-trip-time-tcp-connection-inside-application-layer/ // https://fishi.devtail.io/weblog/2015/04/12/measuring-bandwidth-and-round-trip-time-tcp-connection-inside-application-layer/
rtt := 0.8*float64(l.rtt) + 0.2*float64(d.Nanoseconds()) length := 0.8*float64(l.length) + 0.2*float64(d.Nanoseconds())
// set new rtt // set new length
l.rtt = int64(rtt) l.length = int64(length)
}
// process deals with the send queue
func (l *link) process() {
// receive messages
go func() {
for {
m := new(transport.Message)
err := l.recv(m)
if err != nil {
l.Lock()
l.errCount++
l.Unlock()
}
// process new received message
select {
case l.recvQueue <- &packet{message: m, err: err}:
case <-l.closed:
return
}
}
}()
// send messages
for {
select {
case pk := <-l.sendQueue:
// send the message
pk.status <- l.send(pk.message)
case <-l.closed:
return
}
}
} }
// watches the channel expiry // watches the channel expiry
@ -106,15 +159,33 @@ func (l *link) expiry() {
} }
} }
func (l *link) send(m *transport.Message) error {
if m.Header == nil {
m.Header = make(map[string]string)
}
// send the message
return l.Socket.Send(m)
}
// recv a message on the link
func (l *link) recv(m *transport.Message) error {
if m.Header == nil {
m.Header = make(map[string]string)
}
// receive the transport message
return l.Socket.Recv(m)
}
// Delay is the current load on the link // Delay is the current load on the link
func (l *link) Delay() int64 { func (l *link) Delay() int64 {
return 0 return int64(len(l.sendQueue) + len(l.recvQueue))
} }
// Current transfer rate as bits per second (lower is better) // Current transfer rate as bits per second (lower is better)
func (l *link) Rate() float64 { func (l *link) Rate() float64 {
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
return l.rate return l.rate
} }
@ -124,7 +195,7 @@ func (l *link) Length() int64 {
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
return l.rtt return l.length
} }
func (l *link) Id() string { func (l *link) Id() string {
@ -139,13 +210,62 @@ func (l *link) Close() error {
case <-l.closed: case <-l.closed:
return nil return nil
default: default:
l.Socket.Close()
close(l.closed) close(l.closed)
} }
return nil return nil
} }
// Send sencs a message on the link
func (l *link) Send(m *transport.Message) error { func (l *link) Send(m *transport.Message) error {
// create a new packet to send over the link
p := &packet{
message: m,
status: make(chan error, 1),
}
// get time now
now := time.Now()
// check if its closed first
select {
case <-l.closed:
return io.EOF
default:
}
// queue the message
select {
case l.sendQueue <- p:
// in the send queue
case <-l.closed:
return io.EOF
}
// error to use
var err error
// wait for response
select {
case <-l.closed:
return io.EOF
case err = <-p.status:
}
l.Lock()
defer l.Unlock()
// there's an error increment the counter and bail
if err != nil {
l.errCount++
return err
}
// reset the counter
l.errCount = 0
// calculate the data sent
dataSent := len(m.Body) dataSent := len(m.Body)
// set header length // set header length
@ -153,15 +273,6 @@ func (l *link) Send(m *transport.Message) error {
dataSent += (len(k) + len(v)) dataSent += (len(k) + len(v))
} }
// get time now
now := time.Now()
// send the message
err := l.Socket.Send(m)
l.Lock()
defer l.Unlock()
// calculate based on data // calculate based on data
if dataSent > 0 { if dataSent > 0 {
// measure time taken // measure time taken
@ -173,7 +284,7 @@ func (l *link) Send(m *transport.Message) error {
// rate of send in bits per nanosecond // rate of send in bits per nanosecond
rate := float64(bits) / float64(delta.Nanoseconds()) rate := float64(bits) / float64(delta.Nanoseconds())
// // default the rate if its zero
if l.rate == 0 { if l.rate == 0 {
// rate per second // rate per second
l.rate = rate * 1e9 l.rate = rate * 1e9
@ -183,17 +294,35 @@ func (l *link) Send(m *transport.Message) error {
} }
} }
// if theres no error reset the counter return nil
if err == nil {
l.errCount = 0
}
// otherwise increment the counter
l.errCount++
return err
} }
// Accept accepts a message on the socket
func (l *link) Recv(m *transport.Message) error {
select {
case <-l.closed:
// check if there's any messages left
select {
case pk := <-l.recvQueue:
// check the packet receive error
if pk.err != nil {
return pk.err
}
*m = *pk.message
default:
return io.EOF
}
case pk := <-l.recvQueue:
// check the packet receive error
if pk.err != nil {
return pk.err
}
*m = *pk.message
}
return nil
}
// Status can return connected, closed, error
func (l *link) Status() string { func (l *link) Status() string {
select { select {
case <-l.closed: case <-l.closed:

View File

@ -202,8 +202,8 @@ func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.Wait
t.Fatal(err) t.Fatal(err)
} }
// notify sender we have received the message // notify the sender we have received
<-wait wait <- true
} }
func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
@ -234,7 +234,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr
<-wait <-wait
// give it time to reconnect // give it time to reconnect
time.Sleep(2 * ReconnectTime) time.Sleep(5 * ReconnectTime)
// send the message // send the message
if err := c.Send(&m); err != nil { if err := c.Send(&m); err != nil {
@ -244,7 +244,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr
// wait for the listener to receive the message // wait for the listener to receive the message
// c.Send merely enqueues the message to the link send queue and returns // c.Send merely enqueues the message to the link send queue and returns
// in order to verify it was received we wait for the listener to tell us // in order to verify it was received we wait for the listener to tell us
wait <- true <-wait
} }
func TestReconnectTunnel(t *testing.T) { func TestReconnectTunnel(t *testing.T) {