micro/tunnel/default.go

1239 lines
27 KiB
Go
Raw Normal View History

2019-08-05 19:41:48 +01:00
package tunnel
import (
2019-08-07 18:44:33 +01:00
"errors"
"math/rand"
2019-08-30 20:05:00 +01:00
"strings"
2019-08-05 19:41:48 +01:00
"sync"
"time"
2019-08-05 19:41:48 +01:00
2019-08-07 18:44:33 +01:00
"github.com/google/uuid"
2019-08-05 19:41:48 +01:00
"github.com/micro/go-micro/transport"
2019-08-07 18:44:33 +01:00
"github.com/micro/go-micro/util/log"
2019-08-05 19:41:48 +01:00
)
var (
// DiscoverTime sets the time at which we fire discover messages
DiscoverTime = 60 * time.Second
// KeepAliveTime defines time interval we send keepalive messages to outbound links
KeepAliveTime = 30 * time.Second
// ReconnectTime defines time interval we periodically attempt to reconnect dead links
ReconnectTime = 5 * time.Second
)
2019-08-07 18:44:33 +01:00
// tun represents a network tunnel
2019-08-05 19:41:48 +01:00
type tun struct {
2019-08-07 18:44:33 +01:00
options Options
2019-08-05 19:41:48 +01:00
sync.RWMutex
2019-08-07 18:44:33 +01:00
2019-08-30 20:05:00 +01:00
// the unique id for this tunnel
id string
// tunnel token for session encryption
token string
2019-08-07 18:44:33 +01:00
// to indicate if we're connected or not
2019-08-05 19:41:48 +01:00
connected bool
2019-08-07 18:44:33 +01:00
// the send channel for all messages
send chan *message
// close channel
closed chan bool
2019-08-30 20:05:00 +01:00
// a map of sessions based on Micro-Tunnel-Channel
sessions map[string]*session
2019-08-07 18:44:33 +01:00
// outbound links
links map[string]*link
// listener
listener transport.Listener
2019-08-05 19:41:48 +01:00
}
2019-08-07 18:44:33 +01:00
// create new tunnel on top of a link
func newTunnel(opts ...Option) *tun {
options := DefaultOptions()
2019-08-05 19:41:48 +01:00
for _, o := range opts {
o(&options)
}
2019-08-07 18:44:33 +01:00
return &tun{
2019-08-30 20:05:00 +01:00
options: options,
id: options.Id,
token: options.Token,
send: make(chan *message, 128),
closed: make(chan bool),
sessions: make(map[string]*session),
links: make(map[string]*link),
2019-08-05 19:41:48 +01:00
}
2019-08-07 18:44:33 +01:00
}
2019-08-05 19:41:48 +01:00
2019-08-14 13:26:23 +01:00
// Init initializes tunnel options
func (t *tun) Init(opts ...Option) error {
t.Lock()
defer t.Unlock()
2019-08-14 13:26:23 +01:00
for _, o := range opts {
o(&t.options)
}
return nil
}
2019-08-30 20:05:00 +01:00
// getSession returns a session from the internal session map.
// It does this based on the Micro-Tunnel-Channel and Micro-Tunnel-Session
func (t *tun) getSession(channel, session string) (*session, bool) {
// get the session
2019-08-07 18:44:33 +01:00
t.RLock()
2019-08-30 20:05:00 +01:00
s, ok := t.sessions[channel+session]
2019-08-07 18:44:33 +01:00
t.RUnlock()
return s, ok
2019-08-05 19:41:48 +01:00
}
// delSession deletes a session if it exists
func (t *tun) delSession(channel, session string) {
t.Lock()
if s, ok := t.sessions[channel+session]; ok {
s.Close()
}
delete(t.sessions, channel+session)
t.Unlock()
}
// listChannels returns a list of listening channels
func (t *tun) listChannels() []string {
t.RLock()
defer t.RUnlock()
//nolint:prealloc
var channels []string
for _, session := range t.sessions {
if session.session != "listener" {
continue
}
channels = append(channels, session.channel)
}
return channels
}
2019-08-30 20:05:00 +01:00
// newSession creates a new session and saves it
func (t *tun) newSession(channel, sessionId string) (*session, bool) {
// new session
s := &session{
2019-09-03 15:56:37 +01:00
tunnel: t.id,
2019-08-30 20:05:00 +01:00
channel: channel,
session: sessionId,
token: t.token,
2019-08-07 18:44:33 +01:00
closed: make(chan bool),
recv: make(chan *message, 128),
send: t.send,
2019-08-30 20:05:00 +01:00
errChan: make(chan error, 1),
2019-08-07 18:44:33 +01:00
}
2019-08-30 20:05:00 +01:00
// save session
2019-08-07 18:44:33 +01:00
t.Lock()
2019-08-30 20:05:00 +01:00
_, ok := t.sessions[channel+sessionId]
2019-08-07 18:44:33 +01:00
if ok {
2019-08-30 20:05:00 +01:00
// session already exists
2019-08-07 18:44:33 +01:00
t.Unlock()
return nil, false
}
2019-08-30 20:05:00 +01:00
t.sessions[channel+sessionId] = s
2019-08-07 18:44:33 +01:00
t.Unlock()
2019-08-30 20:05:00 +01:00
// return session
2019-08-07 18:44:33 +01:00
return s, true
2019-08-05 19:41:48 +01:00
}
2019-08-07 18:44:33 +01:00
// TODO: use tunnel id as part of the session
2019-08-30 20:05:00 +01:00
func (t *tun) newSessionId() string {
2019-08-07 18:44:33 +01:00
return uuid.New().String()
2019-08-06 11:45:25 +01:00
}
// announce will send a message to the link to tell the other side of a channel mapping we have.
// This usually happens if someone calls Dial and sends a discover message but otherwise we
// periodically send these messages to asynchronously manage channel mappings.
func (t *tun) announce(channel, session string, link *link) {
// create the "announce" response message for a discover request
msg := &transport.Message{
Header: map[string]string{
"Micro-Tunnel": "announce",
"Micro-Tunnel-Id": t.id,
"Micro-Tunnel-Channel": channel,
"Micro-Tunnel-Session": session,
"Micro-Tunnel-Link": link.id,
},
}
// if no channel is present we've been asked to discover all channels
if len(channel) == 0 {
// get the list of channels
channels := t.listChannels()
// if there are no channels continue
if len(channels) == 0 {
return
}
// create a list of channels as comma separated list
channel = strings.Join(channels, ",")
// set channels as header
msg.Header["Micro-Tunnel-Channel"] = channel
} else {
// otherwise look for a single channel mapping
// looking for existing mapping as a listener
_, exists := t.getSession(channel, "listener")
if !exists {
return
}
}
log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel)
// send back the announcement
if err := link.Send(msg); err != nil {
log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err)
}
}
2019-12-07 23:28:39 +00:00
// manage monitors outbound links and attempts to reconnect to the failed ones
func (t *tun) manage() {
reconnect := time.NewTicker(ReconnectTime)
defer reconnect.Stop()
for {
select {
case <-t.closed:
return
case <-reconnect.C:
2019-12-07 23:28:39 +00:00
t.manageLinks()
}
}
}
2019-12-08 12:12:20 +00:00
// manageLink sends channel discover requests periodically and
// keepalive messages to link
func (t *tun) manageLink(link *link) {
keepalive := time.NewTicker(KeepAliveTime)
defer keepalive.Stop()
discover := time.NewTicker(DiscoverTime)
defer discover.Stop()
for {
select {
case <-t.closed:
return
case <-link.closed:
return
case <-discover.C:
// send a discovery message to all links
2019-12-08 13:45:24 +00:00
if err := t.sendMsg("discover", link); err != nil {
2019-12-08 12:12:20 +00:00
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
}
case <-keepalive.C:
// send keepalive message
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
2019-12-08 13:45:24 +00:00
if err := t.sendMsg("keepalive", link); err != nil {
2019-12-08 12:12:20 +00:00
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
t.delLink(link.Remote())
return
}
}
}
}
2019-12-07 23:28:39 +00:00
// manageLinks is a function that can be called to immediately to link setup
func (t *tun) manageLinks() {
var delLinks []string
2019-12-07 23:28:39 +00:00
t.RLock()
2019-12-07 23:28:39 +00:00
// check the link status and purge dead links
for node, link := range t.links {
// check link status
switch link.State() {
case "closed":
delLinks = append(delLinks, node)
case "error":
delLinks = append(delLinks, node)
}
}
2019-12-07 23:28:39 +00:00
t.RUnlock()
2019-09-02 12:05:47 +01:00
2019-12-07 23:28:39 +00:00
// delete the dead links
if len(delLinks) > 0 {
t.Lock()
for _, node := range delLinks {
log.Debugf("Tunnel deleting dead link for %s", node)
if link, ok := t.links[node]; ok {
link.Close()
delete(t.links, node)
2019-09-02 12:05:47 +01:00
}
2019-12-07 23:28:39 +00:00
}
t.Unlock()
}
2019-09-02 12:05:47 +01:00
2019-12-07 23:28:39 +00:00
// check current link status
var connect []string
// build list of unknown nodes to connect to
t.RLock()
for _, node := range t.options.Nodes {
if _, ok := t.links[node]; !ok {
connect = append(connect, node)
}
}
2019-12-07 23:28:39 +00:00
t.RUnlock()
var wg sync.WaitGroup
for _, node := range connect {
wg.Add(1)
2019-12-08 00:53:55 +00:00
go func(node string) {
2019-12-07 23:28:39 +00:00
defer wg.Done()
// create new link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
return
}
// save the link
t.Lock()
2019-12-08 12:12:20 +00:00
defer t.Unlock()
// just check nothing else was setup in the interim
if _, ok := t.links[node]; ok {
link.Close()
return
}
// save the link
2019-12-07 23:28:39 +00:00
t.links[node] = link
2019-12-08 00:53:55 +00:00
}(node)
2019-12-07 23:28:39 +00:00
}
// wait for all threads to finish
wg.Wait()
}
2019-08-30 20:05:00 +01:00
// process outgoing messages sent by all local sessions
2019-08-07 18:44:33 +01:00
func (t *tun) process() {
// manage the send buffer
2019-08-30 20:05:00 +01:00
// all pseudo sessions throw everything down this
2019-08-07 18:44:33 +01:00
for {
select {
case msg := <-t.send:
newMsg := &transport.Message{
Header: make(map[string]string),
2019-08-07 18:44:33 +01:00
}
// set the data
if msg.data != nil {
for k, v := range msg.data.Header {
newMsg.Header[k] = v
}
newMsg.Body = msg.data.Body
2019-08-08 12:45:37 +01:00
}
2019-08-20 17:20:21 +01:00
// set message head
2019-08-29 12:42:27 +01:00
newMsg.Header["Micro-Tunnel"] = msg.typ
2019-08-20 17:20:21 +01:00
2019-08-07 18:44:33 +01:00
// set the tunnel id on the outgoing message
2019-09-03 15:56:37 +01:00
newMsg.Header["Micro-Tunnel-Id"] = msg.tunnel
2019-08-07 18:44:33 +01:00
2019-08-30 20:05:00 +01:00
// set the tunnel channel on the outgoing message
newMsg.Header["Micro-Tunnel-Channel"] = msg.channel
2019-08-07 18:44:33 +01:00
// set the session id
newMsg.Header["Micro-Tunnel-Session"] = msg.session
2019-08-07 18:44:33 +01:00
// send the message via the interface
t.RLock()
2019-08-29 12:42:27 +01:00
2019-08-11 18:11:33 +01:00
if len(t.links) == 0 {
log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel)
2019-08-11 18:11:33 +01:00
}
2019-08-29 12:42:27 +01:00
2019-08-30 20:05:00 +01:00
var sent bool
var err error
var sendTo []*link
2019-08-30 20:05:00 +01:00
// build the list of links ot send to
for node, link := range t.links {
// get the values we need
link.RLock()
id := link.id
connected := link.connected
loopback := link.loopback
_, exists := link.channels[msg.channel]
link.RUnlock()
// if the link is not connected skip it
if !connected {
log.Debugf("Link for node %s not connected", node)
2019-08-30 20:05:00 +01:00
err = errors.New("link not connected")
continue
}
// if the link was a loopback accepted connection
// and the message is being sent outbound via
// a dialled connection don't use this link
if loopback && msg.outbound {
log.Tracef("Link for node %s is loopback", node)
2019-08-30 20:05:00 +01:00
err = errors.New("link is loopback")
2019-08-14 17:14:39 +01:00
continue
}
// if the message was being returned by the loopback listener
// send it back up the loopback link only
if msg.loopback && !loopback {
log.Tracef("Link for message %s is loopback", node)
2019-08-30 20:05:00 +01:00
err = errors.New("link is not loopback")
continue
}
// check the multicast mappings
if msg.mode == Multicast {
// channel mapping not found in link
if !exists {
continue
}
2019-09-04 18:46:20 +01:00
} else {
// if we're picking the link check the id
// this is where we explicitly set the link
// in a message received via the listen method
if len(msg.link) > 0 && id != msg.link {
2019-09-04 18:46:20 +01:00
err = errors.New("link not found")
continue
}
}
// add to link list
sendTo = append(sendTo, link)
}
t.RUnlock()
// send the message
for _, link := range sendTo {
2019-08-29 12:42:27 +01:00
// send the message via the current link
log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote())
2019-08-30 20:05:00 +01:00
if errr := link.Send(newMsg); errr != nil {
2019-09-25 20:24:56 +01:00
log.Debugf("Tunnel error sending %+v to %s: %v", newMsg.Header, link.Remote(), errr)
2019-08-30 20:05:00 +01:00
err = errors.New(errr.Error())
t.delLink(link.Remote())
continue
}
2019-08-30 20:05:00 +01:00
// is sent
sent = true
// keep sending broadcast messages
2019-10-15 15:40:04 +01:00
if msg.mode > Unicast {
continue
}
// break on unicast
break
2019-08-07 18:44:33 +01:00
}
2019-08-29 12:42:27 +01:00
var gerr error
2019-08-30 20:05:00 +01:00
// set the error if not sent
2019-08-30 20:05:00 +01:00
if !sent {
gerr = err
}
// skip if its not been set
if msg.errChan == nil {
continue
}
2019-08-30 20:05:00 +01:00
// return error non blocking
select {
case msg.errChan <- gerr:
default:
}
2019-08-07 18:44:33 +01:00
case <-t.closed:
return
}
}
2019-08-05 19:41:48 +01:00
}
func (t *tun) delLink(remote string) {
t.Lock()
defer t.Unlock()
// get the link
for id, link := range t.links {
if link.id != remote {
continue
}
// close and delete
2019-09-12 17:40:47 -07:00
log.Debugf("Tunnel deleting link node: %s remote: %s", id, link.Remote())
link.Close()
delete(t.links, id)
}
}
2019-08-07 18:44:33 +01:00
// process incoming messages
2019-08-14 17:14:39 +01:00
func (t *tun) listen(link *link) {
// remove the link on exit
defer func() {
t.delLink(link.Remote())
}()
// let us know if its a loopback
var loopback bool
var connected bool
// set the connected value
link.RLock()
connected = link.connected
link.RUnlock()
2019-08-07 18:44:33 +01:00
for {
// process anything via the net interface
msg := new(transport.Message)
if err := link.Recv(msg); err != nil {
2019-10-23 16:05:21 +01:00
log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err)
2019-08-07 18:44:33 +01:00
return
}
// TODO: figure out network authentication
// for now we use tunnel token to encrypt/decrypt
// session communication, but we will probably need
// some sort of network authentication (token) to avoid
// having rogue actors spamming the network
2019-08-30 20:05:00 +01:00
2019-09-03 15:56:37 +01:00
// message type
mtype := msg.Header["Micro-Tunnel"]
// the tunnel id
id := msg.Header["Micro-Tunnel-Id"]
// the tunnel channel
channel := msg.Header["Micro-Tunnel-Channel"]
// the session id
sessionId := msg.Header["Micro-Tunnel-Session"]
// if its not connected throw away the link
// the first message we process needs to be connect
if !connected && mtype != "connect" {
log.Debugf("Tunnel link %s not connected", link.id)
return
}
// this state machine block handles the only message types
// that we know or care about; connect, close, open, accept,
// discover, announce, session, keepalive
2019-09-03 15:56:37 +01:00
switch mtype {
case "connect":
log.Debugf("Tunnel link %s received connect message", link.Remote())
2019-08-30 20:05:00 +01:00
2019-09-04 18:46:20 +01:00
link.Lock()
// check if we're connecting to ourselves?
2019-08-30 20:05:00 +01:00
if id == t.id {
2019-08-14 17:14:39 +01:00
link.loopback = true
loopback = true
}
2019-08-20 17:20:21 +01:00
// set to remote node
2019-09-12 17:12:49 -07:00
link.id = link.Remote()
// set as connected
link.connected = true
connected = true
2019-09-04 18:46:20 +01:00
link.Unlock()
// save the link once connected
t.Lock()
t.links[link.Remote()] = link
t.Unlock()
2019-12-08 13:45:24 +00:00
// send back an announcement of our channels discovery
go t.announce("", "", link)
2019-12-08 13:45:24 +00:00
// ask for the things on the other wise
go t.sendMsg("discover", link)
2019-08-20 17:20:21 +01:00
// nothing more to do
continue
case "close":
log.Debugf("Tunnel link %s received close message", link.Remote())
// if there is no channel then we close the link
// as its a signal from the other side to close the connection
if len(channel) == 0 {
log.Debugf("Tunnel link %s received close message", link.Remote())
return
}
// the entire listener was closed by the remote side so we need to
// remove the channel mapping for it. should we also close sessions?
if sessionId == "listener" {
link.delChannel(channel)
continue
}
// assuming there's a channel and session
// try get the dialing socket
s, exists := t.getSession(channel, sessionId)
2019-12-01 19:43:36 +00:00
if exists && !loopback {
// only delete the session if its unicast
// otherwise ignore close on the multicast
2019-12-01 19:43:36 +00:00
if s.mode == Unicast {
// only delete this if its unicast
// but not if its a loopback conn
t.delSession(channel, sessionId)
continue
}
}
// otherwise its a session mapping of sorts
case "keepalive":
log.Debugf("Tunnel link %s received keepalive", link.Remote())
2019-08-29 12:42:27 +01:00
// save the keepalive
link.keepalive()
continue
// a new connection dialled outbound
case "open":
2019-09-04 18:46:20 +01:00
log.Debugf("Tunnel link %s received open %s %s", link.id, channel, sessionId)
// we just let it pass through to be processed
// an accept returned by the listener
case "accept":
2019-09-04 18:46:20 +01:00
s, exists := t.getSession(channel, sessionId)
// just set accepted on anything not unicast
2019-10-15 15:40:04 +01:00
if exists && s.mode > Unicast {
2019-09-04 18:46:20 +01:00
s.accepted = true
continue
}
// if its already accepted move on
2019-09-04 18:46:20 +01:00
if exists && s.accepted {
continue
}
// otherwise we're going to process to accept
// a continued session
2019-09-03 15:56:37 +01:00
case "session":
2019-08-20 17:20:21 +01:00
// process message
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
// an announcement of a channel listener
case "announce":
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
// process the announcement
channels := strings.Split(channel, ",")
// update mapping in the link
link.setChannel(channels...)
// this was an announcement not intended for anything
// if the dialing side sent "discover" then a session
// id would be present. We skip in case of multicast.
switch sessionId {
case "listener", "multicast", "":
continue
}
// get the session that asked for the discovery
s, exists := t.getSession(channel, sessionId)
if exists {
// don't bother it's already discovered
if s.discovered {
continue
}
msg := &message{
typ: "announce",
tunnel: id,
channel: channel,
session: sessionId,
link: link.id,
}
// send the announce back to the caller
select {
case <-s.closed:
case s.recv <- msg:
}
}
continue
case "discover":
// send back an announcement
go t.announce(channel, sessionId, link)
continue
2019-08-20 17:20:21 +01:00
default:
// blackhole it
continue
2019-08-07 18:44:33 +01:00
}
2019-08-30 20:05:00 +01:00
// strip tunnel message header
for k := range msg.Header {
2019-08-30 20:05:00 +01:00
if strings.HasPrefix(k, "Micro-Tunnel") {
delete(msg.Header, k)
}
}
2019-08-20 17:20:21 +01:00
2019-08-07 18:44:33 +01:00
// if the session id is blank there's nothing we can do
// TODO: check this is the case, is there any reason
// why we'd have a blank session? Is the tunnel
// used for some other purpose?
2019-08-30 20:05:00 +01:00
if len(channel) == 0 || len(sessionId) == 0 {
2019-08-07 18:44:33 +01:00
continue
}
2019-08-30 20:05:00 +01:00
var s *session
2019-08-07 18:44:33 +01:00
var exists bool
// If its a loopback connection then we've enabled link direction
// listening side is used for listening, the dialling side for dialling
switch {
case loopback, mtype == "open":
2019-08-30 20:05:00 +01:00
s, exists = t.getSession(channel, "listener")
// only return accept to the session
case mtype == "accept":
log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId)
s, exists = t.getSession(channel, sessionId)
if exists && s.accepted {
continue
}
default:
2019-08-30 20:05:00 +01:00
// get the session based on the tunnel id and session
// this could be something we dialed in which case
// we have a session for it otherwise its a listener
2019-08-30 20:05:00 +01:00
s, exists = t.getSession(channel, sessionId)
if !exists {
// try get it based on just the tunnel id
// the assumption here is that a listener
// has no session but its set a listener session
2019-08-30 20:05:00 +01:00
s, exists = t.getSession(channel, "listener")
}
2019-08-07 18:44:33 +01:00
}
// bail if no session or listener has been found
2019-08-07 18:44:33 +01:00
if !exists {
log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId)
2019-08-07 18:44:33 +01:00
// drop it, we don't care about
// messages we don't know about
continue
}
2019-08-30 20:05:00 +01:00
// is the session closed?
2019-08-07 18:44:33 +01:00
select {
case <-s.closed:
// closed
2019-08-30 20:05:00 +01:00
delete(t.sessions, channel)
2019-08-07 18:44:33 +01:00
continue
default:
// otherwise process
2019-08-07 18:44:33 +01:00
}
log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype)
2019-08-07 18:44:33 +01:00
// construct a new transport message
tmsg := &transport.Message{
Header: msg.Header,
Body: msg.Body,
}
// construct the internal message
imsg := &message{
2019-09-03 15:56:37 +01:00
tunnel: id,
typ: mtype,
2019-08-30 20:05:00 +01:00
channel: channel,
session: sessionId,
mode: s.mode,
data: tmsg,
2019-08-29 12:42:27 +01:00
link: link.id,
loopback: loopback,
2019-08-30 20:05:00 +01:00
errChan: make(chan error, 1),
2019-08-07 18:44:33 +01:00
}
// append to recv backlog
// we don't block if we can't pass it on
select {
case s.recv <- imsg:
default:
}
}
2019-08-05 19:41:48 +01:00
}
2019-12-08 13:45:24 +00:00
func (t *tun) sendMsg(method string, link *link) error {
return link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": method,
"Micro-Tunnel-Id": t.id,
},
})
}
2019-08-15 19:24:24 +01:00
// setupLink connects to node and returns link if successful
// It returns error if the link failed to be established
func (t *tun) setupLink(node string) (*link, error) {
2019-08-29 16:58:07 +01:00
log.Debugf("Tunnel setting up link: %s", node)
2019-12-07 23:28:39 +00:00
c, err := t.options.Transport.Dial(node)
if err != nil {
log.Debugf("Tunnel failed to connect to %s: %v", node, err)
return nil, err
}
2019-12-07 23:28:39 +00:00
log.Debugf("Tunnel connected to %s", node)
// create a new link
link := newLink(c)
// set link id to remote side
link.id = c.Remote()
// send the first connect message
2019-12-08 13:45:24 +00:00
if err := t.sendMsg("connect", link); err != nil {
2019-12-08 00:53:55 +00:00
link.Close()
return nil, err
}
2019-08-29 12:42:27 +01:00
// we made the outbound connection
// and sent the connect message
link.connected = true
// process incoming messages
go t.listen(link)
2019-12-08 12:12:20 +00:00
// manage keepalives and discovery messages
go t.manageLink(link)
return link, nil
}
2019-12-08 13:45:24 +00:00
func (t *tun) setupLinks() {
var wg sync.WaitGroup
for _, node := range t.options.Nodes {
wg.Add(1)
go func(node string) {
defer wg.Done()
// we're not trying to fix existing links
if _, ok := t.links[node]; ok {
return
}
// create new link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
return
}
// save the link
t.links[node] = link
}(node)
}
// wait for all threads to finish
wg.Wait()
}
2019-08-14 13:26:23 +01:00
// connect the tunnel to all the nodes and listen for incoming tunnel connections
2019-08-07 18:44:33 +01:00
func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address)
if err != nil {
return err
}
// save the listener
t.listener = l
go func() {
// accept inbound connections
err := l.Accept(func(sock transport.Socket) {
2019-08-08 13:15:30 +01:00
log.Debugf("Tunnel accepted connection from %s", sock.Remote())
// create a new link
2019-08-29 12:42:27 +01:00
link := newLink(sock)
2019-08-07 18:44:33 +01:00
2019-12-08 12:12:20 +00:00
// manage the link
go t.manageLink(link)
2019-09-12 17:12:49 -07:00
// listen for inbound messages.
// only save the link once connected.
// we do this inside liste
2019-08-14 17:14:39 +01:00
t.listen(link)
2019-08-07 18:44:33 +01:00
})
2019-08-29 12:42:27 +01:00
t.RLock()
defer t.RUnlock()
2019-08-07 18:44:33 +01:00
// still connected but the tunnel died
if err != nil && t.connected {
log.Logf("Tunnel listener died: %v", err)
}
}()
2019-08-05 19:41:48 +01:00
return nil
}
2019-08-14 13:26:23 +01:00
// Connect the tunnel
func (t *tun) Connect() error {
t.Lock()
defer t.Unlock()
// already connected
if t.connected {
2019-12-08 12:12:20 +00:00
// do it immediately
2019-12-08 13:45:24 +00:00
t.setupLinks()
2019-10-13 18:36:22 +01:00
// setup links
2019-08-14 13:26:23 +01:00
return nil
}
2019-12-07 23:28:39 +00:00
// connect the tunnel: start the listener
2019-08-14 13:26:23 +01:00
if err := t.connect(); err != nil {
return err
}
// set as connected
t.connected = true
// create new close channel
t.closed = make(chan bool)
// process outbound messages to be sent
// process sends to all links
go t.process()
2019-12-08 13:45:24 +00:00
// call setup before managing them
t.setupLinks()
2019-12-08 12:12:20 +00:00
// manage the links
go t.manage()
2019-08-14 13:26:23 +01:00
return nil
}
2019-08-07 18:44:33 +01:00
func (t *tun) close() error {
2019-09-04 18:46:20 +01:00
// close all the sessions
for id, s := range t.sessions {
s.Close()
delete(t.sessions, id)
}
2019-08-07 18:44:33 +01:00
// close all the links
for node, link := range t.links {
2019-08-07 18:44:33 +01:00
link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "close",
"Micro-Tunnel-Id": t.id,
2019-08-07 18:44:33 +01:00
},
})
link.Close()
delete(t.links, node)
2019-08-07 18:44:33 +01:00
}
// close the listener
2019-09-05 15:23:19 +01:00
// this appears to be blocking
return t.listener.Close()
2019-08-05 19:41:48 +01:00
}
// pickLink will pick the best link based on connectivity, delay, rate and length
func (t *tun) pickLink(links []*link) *link {
var metric float64
var chosen *link
// find the best link
for i, link := range links {
// don't use disconnected or errored links
if link.State() != "connected" {
continue
}
// get the link state info
d := float64(link.Delay())
l := float64(link.Length())
r := link.Rate()
// metric = delay x length x rate
m := d * l * r
// first link so just and go
if i == 0 {
metric = m
chosen = link
continue
}
// we found a better metric
if m < metric {
metric = m
chosen = link
}
}
// if there's no link we're just going to mess around
if chosen == nil {
i := rand.Intn(len(links))
return links[i]
}
// we chose the link with;
// the lowest delay e.g least messages queued
// the lowest rate e.g the least messages flowing
// the lowest length e.g the smallest roundtrip time
return chosen
}
2019-08-21 12:55:10 +01:00
func (t *tun) Address() string {
t.RLock()
defer t.RUnlock()
if !t.connected {
return t.options.Address
}
return t.listener.Addr()
}
2019-08-07 18:44:33 +01:00
// Close the tunnel
func (t *tun) Close() error {
t.Lock()
defer t.Unlock()
if !t.connected {
return nil
}
log.Debug("Tunnel closing")
2019-08-05 19:41:48 +01:00
select {
case <-t.closed:
2019-08-07 18:44:33 +01:00
return nil
2019-08-05 19:41:48 +01:00
default:
2019-08-07 18:44:33 +01:00
close(t.closed)
t.connected = false
2019-08-05 19:41:48 +01:00
}
2019-08-07 18:44:33 +01:00
// send a close message
// we don't close the link
// just the tunnel
return t.close()
2019-08-05 19:41:48 +01:00
}
2019-08-07 18:44:33 +01:00
// Dial an address
func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
2019-08-30 20:05:00 +01:00
log.Debugf("Tunnel dialing %s", channel)
c, ok := t.newSession(channel, t.newSessionId())
2019-08-07 18:44:33 +01:00
if !ok {
2019-08-30 20:05:00 +01:00
return nil, errors.New("error dialing " + channel)
2019-08-07 18:44:33 +01:00
}
// set remote
2019-08-30 20:05:00 +01:00
c.remote = channel
2019-08-07 18:44:33 +01:00
// set local
c.local = "local"
2019-08-30 20:05:00 +01:00
// outbound session
2019-08-14 17:14:39 +01:00
c.outbound = true
2019-08-07 18:44:33 +01:00
// get opts
options := DialOptions{
Timeout: DefaultDialTimeout,
}
for _, o := range opts {
o(&options)
}
// set the multicast option
2019-10-15 15:40:04 +01:00
c.mode = options.Mode
// set the dial timeout
2019-12-07 23:28:39 +00:00
c.dialTimeout = options.Timeout
// set read timeout set to never
c.readTimeout = time.Duration(-1)
2019-09-04 18:46:20 +01:00
var links []*link
2019-10-22 18:43:09 +01:00
// did we measure the rtt
var measured bool
t.RLock()
// non multicast so we need to find the link
for _, link := range t.links {
// use the link specified it its available
if id := options.Link; len(id) > 0 && link.id != id {
continue
}
2019-09-11 11:57:41 -07:00
// get the channel
lastMapped := link.getChannel(channel)
// we have at least one channel mapping
if !lastMapped.IsZero() {
links = append(links, link)
c.discovered = true
}
}
t.RUnlock()
// link not found and one was specified so error out
if len(links) == 0 && len(options.Link) > 0 {
// delete session and return error
t.delSession(c.channel, c.session)
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound)
return nil, ErrLinkNotFound
2019-09-11 11:57:41 -07:00
}
2019-09-05 07:41:19 +01:00
// discovered so set the link if not multicast
2019-10-15 15:40:04 +01:00
if c.discovered && c.mode == Unicast {
// pickLink will pick the best link
link := t.pickLink(links)
// set the link
c.link = link.id
}
// if its not already discovered we need to attempt to do so
if !c.discovered {
2019-10-22 18:43:09 +01:00
// piggy back roundtrip
nowRTT := time.Now()
// attempt to discover the link
err := c.Discover()
if err != nil {
t.delSession(c.channel, c.session)
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err)
return nil, err
}
2019-09-05 17:40:41 +01:00
2019-10-22 18:43:09 +01:00
// set roundtrip
d := time.Since(nowRTT)
// set the link time
t.RLock()
2019-10-22 19:38:29 +01:00
link, ok := t.links[c.link]
2019-10-22 18:43:09 +01:00
t.RUnlock()
if ok {
// set the rountrip time
link.setRTT(d)
2019-10-22 19:38:29 +01:00
// set measured to true
measured = true
2019-10-22 18:43:09 +01:00
}
}
// return early if its not unicast
// we will not call "open" for multicast
if c.mode != Unicast {
return c, nil
}
// Note: we go no further for multicast or broadcast.
// This is a unicast session so we call "open" and wait
// for an "accept"
2019-10-22 18:43:09 +01:00
// reset now in case we use it
now := time.Now()
2019-10-22 18:43:09 +01:00
// try to open the session
if err := c.Open(); err != nil {
// delete the session
t.delSession(c.channel, c.session)
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err)
return nil, err
}
// set time take to open
d := time.Since(now)
2019-10-22 18:43:09 +01:00
// if we haven't measured the roundtrip do it now
if !measured {
2019-10-22 18:43:09 +01:00
// set the link time
t.RLock()
link, ok := t.links[c.link]
t.RUnlock()
2019-10-22 18:43:09 +01:00
if ok {
// set the rountrip time
link.setRTT(d)
2019-10-22 18:43:09 +01:00
}
}
2019-08-07 18:44:33 +01:00
return c, nil
}
// Accept a connection on the address
2019-10-15 15:40:04 +01:00
func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
2019-08-30 20:05:00 +01:00
log.Debugf("Tunnel listening on %s", channel)
2019-12-07 23:28:39 +00:00
options := ListenOptions{
// Read timeout defaults to never
Timeout: time.Duration(-1),
}
2019-10-15 15:40:04 +01:00
for _, o := range opts {
o(&options)
}
2019-08-30 20:05:00 +01:00
// create a new session by hashing the address
c, ok := t.newSession(channel, "listener")
2019-08-07 18:44:33 +01:00
if !ok {
2019-08-30 20:05:00 +01:00
return nil, errors.New("already listening on " + channel)
2019-08-07 18:44:33 +01:00
}
// delete function removes the session when closed
delFunc := func() {
t.delSession(channel, "listener")
}
2019-08-07 18:44:33 +01:00
// set remote. it will be replaced by the first message received
c.remote = "remote"
// set local
2019-08-30 20:05:00 +01:00
c.local = channel
2019-10-15 15:40:04 +01:00
// set mode
c.mode = options.Mode
2019-12-07 23:28:39 +00:00
// set the timeout
c.readTimeout = options.Timeout
2019-08-07 18:44:33 +01:00
tl := &tunListener{
2019-08-30 20:05:00 +01:00
channel: channel,
// tunnel token
token: t.token,
2019-08-07 18:44:33 +01:00
// the accept channel
2019-08-30 20:05:00 +01:00
accept: make(chan *session, 128),
2019-08-07 18:44:33 +01:00
// the channel to close
closed: make(chan bool),
// tunnel closed channel
tunClosed: t.closed,
2019-08-30 20:05:00 +01:00
// the listener session
session: c,
// delete session
delFunc: delFunc,
2019-08-07 18:44:33 +01:00
}
// this kicks off the internal message processor
2019-08-30 20:05:00 +01:00
// for the listener so it can create pseudo sessions
2019-08-07 18:44:33 +01:00
// per session if they do not exist or pass messages
// to the existign sessions
go tl.process()
// announces the listener channel to others
go tl.announce()
2019-08-07 18:44:33 +01:00
// return the listener
return tl, nil
2019-08-05 19:41:48 +01:00
}
2019-08-20 17:21:35 +01:00
2019-09-04 15:41:57 +01:00
func (t *tun) Links() []Link {
t.RLock()
defer t.RUnlock()
links := make([]Link, 0, len(t.links))
2019-09-04 15:41:57 +01:00
for _, link := range t.links {
links = append(links, link)
}
return links
}
2019-08-20 17:21:35 +01:00
func (t *tun) String() string {
return "mucp"
}