Monitor outbound links and delete them when disconnected

This commit is contained in:
Milos Gajdos 2019-08-15 16:52:16 +01:00
parent a42de29f67
commit 740cfab8d0
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -4,13 +4,20 @@ import (
"crypto/sha256" "crypto/sha256"
"errors" "errors"
"fmt" "fmt"
"io"
"sync" "sync"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
) )
var (
// DefaultKeepAlive defines interval we check if the links are alive
DefaultKeepAlive = 30 * time.Second
)
// tun represents a network tunnel // tun represents a network tunnel
type tun struct { type tun struct {
options Options options Options
@ -41,8 +48,9 @@ type tun struct {
type link struct { type link struct {
transport.Socket transport.Socket
id string id string
loopback bool loopback bool
lastKeepAlive time.Time
} }
// create new tunnel on top of a link // create new tunnel on top of a link
@ -118,6 +126,11 @@ func (t *tun) newSession() string {
return uuid.New().String() return uuid.New().String()
} }
// monitor monitors all links and attempts to dial the failed ones
func (t *tun) monitor() {
// TODO: implement this
}
// process outgoing messages sent by all local sockets // process outgoing messages sent by all local sockets
func (t *tun) process() { func (t *tun) process() {
// manage the send buffer // manage the send buffer
@ -144,19 +157,24 @@ func (t *tun) process() {
newMsg.Header["Micro-Tunnel-Token"] = t.token newMsg.Header["Micro-Tunnel-Token"] = t.token
// send the message via the interface // send the message via the interface
t.RLock() t.Lock()
if len(t.links) == 0 { if len(t.links) == 0 {
log.Debugf("Zero links to send to") log.Debugf("Zero links to send to")
} }
for _, link := range t.links { for _, link := range t.links {
// TODO: error check and reconnect
log.Debugf("Sending %+v to %s", newMsg, link.Remote())
if link.loopback && msg.outbound { if link.loopback && msg.outbound {
continue continue
} }
link.Send(newMsg) log.Debugf("Sending %+v to %s", newMsg, link.Remote())
if err := link.Send(newMsg); err != nil {
log.Debugf("Error sending %+v to %s: %v", newMsg, link.Remote(), err)
if err == io.EOF {
delete(t.links, link.id)
continue
}
}
} }
t.RUnlock() t.Unlock()
case <-t.closed: case <-t.closed:
return return
} }
@ -165,9 +183,6 @@ func (t *tun) process() {
// process incoming messages // process incoming messages
func (t *tun) listen(link *link) { func (t *tun) listen(link *link) {
// loopback flag
var loopback bool
for { for {
// process anything via the net interface // process anything via the net interface
msg := new(transport.Message) msg := new(transport.Message)
@ -179,6 +194,7 @@ func (t *tun) listen(link *link) {
switch msg.Header["Micro-Tunnel"] { switch msg.Header["Micro-Tunnel"] {
case "connect": case "connect":
log.Debugf("Tunnel link %s received connect message", link.Remote())
// check the Micro-Tunnel-Token // check the Micro-Tunnel-Token
token, ok := msg.Header["Micro-Tunnel-Token"] token, ok := msg.Header["Micro-Tunnel-Token"]
if !ok { if !ok {
@ -187,14 +203,18 @@ func (t *tun) listen(link *link) {
// are we connecting to ourselves? // are we connecting to ourselves?
if token == t.token { if token == t.token {
loopback = true
link.loopback = true link.loopback = true
} }
continue continue
case "close": case "close":
log.Debugf("Tunnel link %s closing connection", link.Remote())
// TODO: handle the close message // TODO: handle the close message
// maybe report io.EOF or kill the link // maybe report io.EOF or kill the link
continue continue
case "ping":
log.Debugf("Tunnel link %s received keepalive", link.Remote())
link.lastKeepAlive = time.Now()
continue
} }
// the tunnel id // the tunnel id
@ -219,7 +239,7 @@ func (t *tun) listen(link *link) {
log.Debugf("Received %+v from %s", msg, link.Remote()) log.Debugf("Received %+v from %s", msg, link.Remote())
switch { switch {
case loopback: case link.loopback:
s, exists = t.getSocket(id, "listener") s, exists = t.getSocket(id, "listener")
default: default:
// get the socket based on the tunnel id and session // get the socket based on the tunnel id and session
@ -286,6 +306,36 @@ func (t *tun) listen(link *link) {
} }
} }
// keepalive periodically sends ping messages to link
func (t *tun) keepalive(link *link) {
ping := time.NewTicker(DefaultKeepAlive)
defer ping.Stop()
for {
select {
case <-t.closed:
return
case <-ping.C:
// send ping message
log.Debugf("Tunnel sending ping to link %v: %v", link.Remote(), link.id)
if err := link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "ping",
"Micro-Tunnel-Token": t.token,
},
}); err != nil {
if err == io.EOF {
t.Lock()
delete(t.links, link.id)
t.Unlock()
return
}
// TODO: handle this error
}
}
}
}
// connect the tunnel to all the nodes and listen for incoming tunnel connections // connect the tunnel to all the nodes and listen for incoming tunnel connections
func (t *tun) connect() error { func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address) l, err := t.options.Transport.Listen(t.options.Address)
@ -365,6 +415,9 @@ func (t *tun) connect() error {
// process incoming messages // process incoming messages
go t.listen(link) go t.listen(link)
// start keepalive monitor
go t.keepalive(link)
} }
// process outbound messages to be sent // process outbound messages to be sent