2019-07-10 17:36:04 +01:00
|
|
|
package tunnel
|
|
|
|
|
|
|
|
import (
|
|
|
|
"crypto/sha256"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/micro/go-micro/network/link"
|
|
|
|
"github.com/micro/go-micro/transport"
|
|
|
|
)
|
|
|
|
|
|
|
|
// tun represents a network tunnel
|
|
|
|
type tun struct {
|
2019-07-10 18:35:10 +01:00
|
|
|
// the link on top of which we build a tunnel
|
2019-07-10 17:36:04 +01:00
|
|
|
link link.Link
|
|
|
|
|
|
|
|
sync.RWMutex
|
|
|
|
|
2019-07-10 18:35:10 +01:00
|
|
|
// to indicate if we're connected or not
|
2019-07-10 17:36:04 +01:00
|
|
|
connected bool
|
|
|
|
|
2019-07-10 18:35:10 +01:00
|
|
|
// the send channel for all messages
|
2019-07-10 17:36:04 +01:00
|
|
|
send chan *message
|
2019-07-10 18:35:10 +01:00
|
|
|
|
2019-07-10 17:36:04 +01:00
|
|
|
// close channel
|
|
|
|
closed chan bool
|
|
|
|
|
2019-07-10 18:35:10 +01:00
|
|
|
// a map of sockets based on Micro-Tunnel-Id
|
2019-07-10 17:36:04 +01:00
|
|
|
sockets map[string]*socket
|
|
|
|
}
|
|
|
|
|
2019-07-10 19:01:24 +01:00
|
|
|
// create new tunnel on top of a link
|
2019-07-10 17:36:04 +01:00
|
|
|
func newTunnel(link link.Link) *tun {
|
|
|
|
return &tun{
|
|
|
|
link: link,
|
|
|
|
send: make(chan *message, 128),
|
|
|
|
closed: make(chan bool),
|
|
|
|
sockets: make(map[string]*socket),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-10 19:01:24 +01:00
|
|
|
// getSocket returns a socket from the internal socket map.
|
|
|
|
// It does this based on the Micro-Tunnel-Id and Micro-Tunnel-Session
|
|
|
|
func (t *tun) getSocket(id, session string) (*socket, bool) {
|
2019-07-10 17:36:04 +01:00
|
|
|
// get the socket
|
|
|
|
t.RLock()
|
2019-07-10 19:01:24 +01:00
|
|
|
s, ok := t.sockets[id+session]
|
2019-07-10 17:36:04 +01:00
|
|
|
t.RUnlock()
|
|
|
|
return s, ok
|
|
|
|
}
|
|
|
|
|
2019-07-10 18:35:10 +01:00
|
|
|
// newSocket creates a new socket and saves it
|
2019-07-10 19:01:24 +01:00
|
|
|
func (t *tun) newSocket(id, session string) (*socket, bool) {
|
2019-07-10 17:36:04 +01:00
|
|
|
// hash the id
|
|
|
|
h := sha256.New()
|
|
|
|
h.Write([]byte(id))
|
|
|
|
id = fmt.Sprintf("%x", h.Sum(nil))
|
|
|
|
|
|
|
|
// new socket
|
|
|
|
s := &socket{
|
2019-07-10 18:24:03 +01:00
|
|
|
id: id,
|
2019-07-10 19:01:24 +01:00
|
|
|
session: session,
|
2019-07-10 18:24:03 +01:00
|
|
|
closed: make(chan bool),
|
|
|
|
recv: make(chan *message, 128),
|
|
|
|
send: t.send,
|
2019-07-10 17:36:04 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// save socket
|
|
|
|
t.Lock()
|
2019-07-10 19:01:24 +01:00
|
|
|
_, ok := t.sockets[id+session]
|
|
|
|
if ok {
|
|
|
|
// socket already exists
|
|
|
|
t.Unlock()
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
t.sockets[id+session] = s
|
2019-07-10 17:36:04 +01:00
|
|
|
t.Unlock()
|
|
|
|
|
|
|
|
// return socket
|
2019-07-10 19:01:24 +01:00
|
|
|
return s, true
|
2019-07-10 17:36:04 +01:00
|
|
|
}
|
|
|
|
|
2019-07-10 18:24:03 +01:00
|
|
|
// TODO: use tunnel id as part of the session
|
|
|
|
func (t *tun) newSession() string {
|
|
|
|
return uuid.New().String()
|
|
|
|
}
|
|
|
|
|
2019-07-10 18:35:10 +01:00
|
|
|
// process outgoing messages sent by all local sockets
|
2019-07-10 17:36:04 +01:00
|
|
|
func (t *tun) process() {
|
|
|
|
// manage the send buffer
|
|
|
|
// all pseudo sockets throw everything down this
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case msg := <-t.send:
|
|
|
|
nmsg := &transport.Message{
|
|
|
|
Header: msg.data.Header,
|
|
|
|
Body: msg.data.Body,
|
|
|
|
}
|
|
|
|
|
2019-07-10 18:24:03 +01:00
|
|
|
// set the tunnel id on the outgoing message
|
|
|
|
nmsg.Header["Micro-Tunnel-Id"] = msg.id
|
|
|
|
|
|
|
|
// set the session id
|
|
|
|
nmsg.Header["Micro-Tunnel-Session"] = msg.session
|
2019-07-10 17:36:04 +01:00
|
|
|
|
|
|
|
// send the message via the interface
|
|
|
|
if err := t.link.Send(nmsg); err != nil {
|
|
|
|
// no op
|
|
|
|
// TODO: do something
|
|
|
|
}
|
|
|
|
case <-t.closed:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// process incoming messages
|
|
|
|
func (t *tun) listen() {
|
|
|
|
for {
|
|
|
|
// process anything via the net interface
|
|
|
|
msg := new(transport.Message)
|
|
|
|
err := t.link.Recv(msg)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2019-07-10 18:24:03 +01:00
|
|
|
// the tunnel id
|
|
|
|
id := msg.Header["Micro-Tunnel-Id"]
|
|
|
|
|
|
|
|
// the session id
|
|
|
|
session := msg.Header["Micro-Tunnel-Session"]
|
2019-07-10 17:36:04 +01:00
|
|
|
|
2019-07-10 19:17:36 +01:00
|
|
|
// if the session id is blank there's nothing we can do
|
|
|
|
// TODO: check this is the case, is there any reason
|
|
|
|
// why we'd have a blank session? Is the tunnel
|
|
|
|
// used for some other purpose?
|
2019-07-10 19:01:24 +01:00
|
|
|
if len(session) == 0 {
|
2019-07-10 19:17:36 +01:00
|
|
|
continue
|
2019-07-10 19:01:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// get the socket based on the tunnel id and session
|
|
|
|
// this could be something we dialed in which case
|
|
|
|
// we have a session for it otherwise its a listener
|
|
|
|
s, exists := t.getSocket(id, session)
|
2019-07-10 17:36:04 +01:00
|
|
|
if !exists {
|
2019-07-10 19:17:36 +01:00
|
|
|
// try get it based on just the tunnel id
|
|
|
|
// the assumption here is that a listener
|
|
|
|
// has no session but its set a listener session
|
|
|
|
s, exists = t.getSocket(id, "listener")
|
|
|
|
if !exists {
|
|
|
|
conti
|
|
|
|
|
|
|
|
// drop it, we don't care about
|
|
|
|
// messages we don't know about
|
|
|
|
continue
|
|
|
|
}
|
2019-07-10 17:36:04 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// is the socket closed?
|
|
|
|
select {
|
|
|
|
case <-s.closed:
|
|
|
|
// closed
|
|
|
|
delete(t.sockets, id)
|
|
|
|
continue
|
|
|
|
default:
|
|
|
|
// process
|
|
|
|
}
|
|
|
|
|
|
|
|
// is the socket new?
|
|
|
|
select {
|
2019-07-10 18:35:10 +01:00
|
|
|
// if its new the socket is actually blocked waiting
|
|
|
|
// for a connection. so we check if its waiting.
|
2019-07-10 17:36:04 +01:00
|
|
|
case <-s.wait:
|
2019-07-10 18:35:10 +01:00
|
|
|
// if its waiting e.g its new then we close it
|
2019-07-10 17:36:04 +01:00
|
|
|
default:
|
|
|
|
// set remote address of the socket
|
|
|
|
s.remote = msg.Header["Remote"]
|
|
|
|
close(s.wait)
|
|
|
|
}
|
|
|
|
|
2019-07-10 18:35:10 +01:00
|
|
|
// construct a new transport message
|
2019-07-10 17:36:04 +01:00
|
|
|
tmsg := &transport.Message{
|
|
|
|
Header: msg.Header,
|
|
|
|
Body: msg.Body,
|
|
|
|
}
|
|
|
|
|
2019-07-10 18:35:10 +01:00
|
|
|
// construct the internal message
|
|
|
|
imsg := &message{
|
2019-07-10 19:01:24 +01:00
|
|
|
id: id,
|
2019-07-10 18:35:10 +01:00
|
|
|
session: session,
|
2019-07-10 19:01:24 +01:00
|
|
|
data: tmsg,
|
2019-07-10 18:35:10 +01:00
|
|
|
}
|
|
|
|
|
2019-07-10 17:36:04 +01:00
|
|
|
// append to recv backlog
|
2019-07-10 18:35:10 +01:00
|
|
|
// we don't block if we can't pass it on
|
|
|
|
select {
|
|
|
|
case s.recv <- imsg:
|
|
|
|
default:
|
|
|
|
}
|
2019-07-10 17:36:04 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close the tunnel
|
|
|
|
func (t *tun) Close() error {
|
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
|
|
|
|
if !t.connected {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-t.closed:
|
|
|
|
return nil
|
|
|
|
default:
|
|
|
|
// close all the sockets
|
|
|
|
for _, s := range t.sockets {
|
|
|
|
s.Close()
|
|
|
|
}
|
|
|
|
// close the connection
|
|
|
|
close(t.closed)
|
|
|
|
t.connected = false
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Connect the tunnel
|
|
|
|
func (t *tun) Connect() error {
|
|
|
|
t.Lock()
|
|
|
|
defer t.Unlock()
|
|
|
|
|
|
|
|
// already connected
|
|
|
|
if t.connected {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// set as connected
|
|
|
|
t.connected = true
|
|
|
|
// create new close channel
|
|
|
|
t.closed = make(chan bool)
|
|
|
|
|
|
|
|
// process messages to be sent
|
|
|
|
go t.process()
|
|
|
|
// process incoming messages
|
|
|
|
go t.listen()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Dial an address
|
|
|
|
func (t *tun) Dial(addr string) (Conn, error) {
|
2019-07-10 19:01:24 +01:00
|
|
|
c, ok := t.newSocket(addr, t.newSession())
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.New("error dialing " + addr)
|
|
|
|
}
|
2019-07-10 17:36:04 +01:00
|
|
|
// set remote
|
|
|
|
c.remote = addr
|
|
|
|
// set local
|
|
|
|
c.local = t.link.Local()
|
2019-07-10 18:24:03 +01:00
|
|
|
|
2019-07-10 17:36:04 +01:00
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
2019-07-10 18:24:03 +01:00
|
|
|
// Accept a connection on the address
|
|
|
|
func (t *tun) Listen(addr string) (Listener, error) {
|
2019-07-10 17:36:04 +01:00
|
|
|
// create a new socket by hashing the address
|
2019-07-10 19:01:24 +01:00
|
|
|
c, ok := t.newSocket(addr, "listener")
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.New("already listening on " + addr)
|
|
|
|
}
|
|
|
|
|
2019-07-10 17:36:04 +01:00
|
|
|
// set remote. it will be replaced by the first message received
|
|
|
|
c.remote = t.link.Remote()
|
|
|
|
// set local
|
|
|
|
c.local = addr
|
|
|
|
|
2019-07-10 18:24:03 +01:00
|
|
|
tl := &tunListener{
|
|
|
|
addr: addr,
|
|
|
|
// the accept channel
|
|
|
|
accept: make(chan *socket, 128),
|
|
|
|
// the channel to close
|
|
|
|
closed: make(chan bool),
|
|
|
|
// the connection
|
|
|
|
conn: c,
|
|
|
|
// the listener socket
|
|
|
|
socket: c,
|
|
|
|
}
|
|
|
|
|
2019-07-10 19:13:50 +01:00
|
|
|
// this kicks off the internal message processor
|
|
|
|
// for the listener so it can create pseudo sockets
|
|
|
|
// per session if they do not exist or pass messages
|
|
|
|
// to the existign sessions
|
2019-07-10 18:24:03 +01:00
|
|
|
go tl.process()
|
|
|
|
|
|
|
|
// return the listener
|
|
|
|
return tl, nil
|
2019-07-10 17:36:04 +01:00
|
|
|
}
|