A few changes for the network / tunnel link state
This commit is contained in:
@@ -746,8 +746,13 @@ func (t *tun) setupLink(node string) (*link, error) {
|
||||
}
|
||||
log.Debugf("Tunnel connected to %s", node)
|
||||
|
||||
// create a new link
|
||||
link := newLink(c)
|
||||
// set link id to remote side
|
||||
link.id = c.Remote()
|
||||
|
||||
// send the first connect message
|
||||
if err := c.Send(&transport.Message{
|
||||
if err := link.Send(&transport.Message{
|
||||
Header: map[string]string{
|
||||
"Micro-Tunnel": "connect",
|
||||
"Micro-Tunnel-Id": t.id,
|
||||
@@ -757,10 +762,6 @@ func (t *tun) setupLink(node string) (*link, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create a new link
|
||||
link := newLink(c)
|
||||
// set link id to remote side
|
||||
link.id = c.Remote()
|
||||
// we made the outbound connection
|
||||
// and sent the connect message
|
||||
link.connected = true
|
||||
|
||||
109
tunnel/link.go
109
tunnel/link.go
@@ -1,12 +1,14 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/transport"
|
||||
"github.com/micro/go-micro/util/log"
|
||||
)
|
||||
|
||||
type link struct {
|
||||
@@ -42,6 +44,9 @@ type link struct {
|
||||
rate float64
|
||||
// keep an error count on the link
|
||||
errCount int
|
||||
|
||||
// link state channel
|
||||
state chan *packet
|
||||
}
|
||||
|
||||
// packet send over link
|
||||
@@ -56,21 +61,49 @@ type packet struct {
|
||||
err error
|
||||
}
|
||||
|
||||
var (
|
||||
// the 4 byte 0 packet sent to determine the link state
|
||||
linkRequest = []byte{0, 0, 0, 0}
|
||||
// the 4 byte 1 filled packet sent to determine link state
|
||||
linkResponse = []byte{1, 1, 1, 1}
|
||||
)
|
||||
|
||||
func newLink(s transport.Socket) *link {
|
||||
l := &link{
|
||||
Socket: s,
|
||||
id: uuid.New().String(),
|
||||
lastKeepAlive: time.Now(),
|
||||
closed: make(chan bool),
|
||||
state: make(chan *packet, 64),
|
||||
channels: make(map[string]time.Time),
|
||||
sendQueue: make(chan *packet, 128),
|
||||
recvQueue: make(chan *packet, 128),
|
||||
}
|
||||
|
||||
// process inbound/outbound packets
|
||||
go l.process()
|
||||
go l.expiry()
|
||||
// manage the link state
|
||||
go l.manage()
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// setRate sets the bits per second rate as a float64
|
||||
func (l *link) setRate(bits int64, delta time.Duration) {
|
||||
// rate of send in bits per nanosecond
|
||||
rate := float64(bits) / float64(delta.Nanoseconds())
|
||||
|
||||
// default the rate if its zero
|
||||
if l.rate == 0 {
|
||||
// rate per second
|
||||
l.rate = rate * 1e9
|
||||
} else {
|
||||
// set new rate per second
|
||||
l.rate = 0.8*l.rate + 0.2*(rate*1e9)
|
||||
}
|
||||
}
|
||||
|
||||
// setRTT sets a nanosecond based moving average roundtrip time for the link
|
||||
func (l *link) setRTT(d time.Duration) {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
@@ -101,8 +134,22 @@ func (l *link) process() {
|
||||
|
||||
// process new received message
|
||||
|
||||
pk := &packet{message: m, err: err}
|
||||
|
||||
// this is our link state packet
|
||||
if m.Header["Micro-Method"] == "link" {
|
||||
// process link state message
|
||||
select {
|
||||
case l.state <- pk:
|
||||
default:
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// process all messages as is
|
||||
|
||||
select {
|
||||
case l.recvQueue <- &packet{message: m, err: err}:
|
||||
case l.recvQueue <- pk:
|
||||
case <-l.closed:
|
||||
return
|
||||
}
|
||||
@@ -122,15 +169,49 @@ func (l *link) process() {
|
||||
}
|
||||
}
|
||||
|
||||
// watches the channel expiry
|
||||
func (l *link) expiry() {
|
||||
// manage manages the link state including rtt packets and channel mapping expiry
|
||||
func (l *link) manage() {
|
||||
// tick over every minute to expire and fire rtt packets
|
||||
t := time.NewTicker(time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
// used to send link state packets
|
||||
send := func(b []byte) {
|
||||
l.Send(&transport.Message{
|
||||
Header: map[string]string{
|
||||
"Micro-Method": "link",
|
||||
}, Body: b,
|
||||
})
|
||||
}
|
||||
|
||||
// set time now
|
||||
now := time.Now()
|
||||
|
||||
// send the initial rtt request packet
|
||||
send(linkRequest)
|
||||
|
||||
for {
|
||||
select {
|
||||
// exit if closed
|
||||
case <-l.closed:
|
||||
return
|
||||
// process link state rtt packets
|
||||
case p := <-l.state:
|
||||
if p.err != nil {
|
||||
continue
|
||||
}
|
||||
// check the type of message
|
||||
switch {
|
||||
case bytes.Compare(p.message.Body, linkRequest) == 0:
|
||||
log.Tracef("Link %s received link request %v", l.id, p.message.Body)
|
||||
// send response
|
||||
send(linkResponse)
|
||||
case bytes.Compare(p.message.Body, linkResponse) == 0:
|
||||
// set round trip time
|
||||
d := time.Since(now)
|
||||
log.Tracef("Link %s received link response in %v", p.message.Body, d)
|
||||
l.setRTT(d)
|
||||
}
|
||||
case <-t.C:
|
||||
// drop any channel mappings older than 2 minutes
|
||||
var kill []string
|
||||
@@ -155,6 +236,10 @@ func (l *link) expiry() {
|
||||
delete(l.channels, ch)
|
||||
}
|
||||
l.Unlock()
|
||||
|
||||
// fire off a link state rtt packet
|
||||
now = time.Now()
|
||||
send(linkRequest)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -278,23 +363,11 @@ func (l *link) Send(m *transport.Message) error {
|
||||
|
||||
// calculate based on data
|
||||
if dataSent > 0 {
|
||||
// measure time taken
|
||||
delta := time.Since(now)
|
||||
|
||||
// bit sent
|
||||
bits := dataSent * 1024
|
||||
|
||||
// rate of send in bits per nanosecond
|
||||
rate := float64(bits) / float64(delta.Nanoseconds())
|
||||
|
||||
// default the rate if its zero
|
||||
if l.rate == 0 {
|
||||
// rate per second
|
||||
l.rate = rate * 1e9
|
||||
} else {
|
||||
// set new rate per second
|
||||
l.rate = 0.8*l.rate + 0.2*(rate*1e9)
|
||||
}
|
||||
// set the rate
|
||||
l.setRate(int64(bits), time.Since(now))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user