Add back the old tunnel interface

This commit is contained in:
Asim Aslam 2019-08-07 18:44:33 +01:00
parent 380d9790e6
commit 117376a922
7 changed files with 675 additions and 126 deletions

View File

@ -1,79 +1,436 @@
package tunnel
import (
"crypto/sha256"
"errors"
"fmt"
"sync"
"github.com/google/uuid"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/util/log"
)
// tun represents a network tunnel
type tun struct {
options Options
sync.RWMutex
tr transport.Transport
options Options
// to indicate if we're connected or not
connected bool
closed chan bool
// the send channel for all messages
send chan *message
// close channel
closed chan bool
// a map of sockets based on Micro-Tunnel-Id
sockets map[string]*socket
// outbound links
links map[string]*link
// listener
listener transport.Listener
}
func newTunnel(opts ...Option) Tunnel {
// initialize default options
options := DefaultOptions()
type link struct {
transport.Socket
id string
}
// create new tunnel on top of a link
func newTunnel(opts ...Option) *tun {
options := DefaultOptions()
for _, o := range opts {
o(&options)
}
// tunnel transport
tr := newTransport()
t := &tun{
tr: tr,
return &tun{
options: options,
send: make(chan *message, 128),
closed: make(chan bool),
sockets: make(map[string]*socket),
links: make(map[string]*link),
}
}
// getSocket returns a socket from the internal socket map.
// It does this based on the Micro-Tunnel-Id and Micro-Tunnel-Session
func (t *tun) getSocket(id, session string) (*socket, bool) {
// get the socket
t.RLock()
s, ok := t.sockets[id+session]
t.RUnlock()
return s, ok
}
// newSocket creates a new socket and saves it
func (t *tun) newSocket(id, session string) (*socket, bool) {
// hash the id
h := sha256.New()
h.Write([]byte(id))
id = fmt.Sprintf("%x", h.Sum(nil))
// new socket
s := &socket{
id: id,
session: session,
closed: make(chan bool),
recv: make(chan *message, 128),
send: t.send,
wait: make(chan bool),
}
return t
// save socket
t.Lock()
_, ok := t.sockets[id+session]
if ok {
// socket already exists
t.Unlock()
return nil, false
}
t.sockets[id+session] = s
t.Unlock()
// return socket
return s, true
}
// Id returns tunnel id
func (t *tun) Id() string {
return t.options.Id
// TODO: use tunnel id as part of the session
func (t *tun) newSession() string {
return uuid.New().String()
}
// Options returns tunnel options
func (t *tun) Options() Options {
return t.options
// process outgoing messages sent by all local sockets
func (t *tun) process() {
// manage the send buffer
// all pseudo sockets throw everything down this
for {
select {
case msg := <-t.send:
nmsg := &transport.Message{
Header: msg.data.Header,
Body: msg.data.Body,
}
// set the tunnel id on the outgoing message
nmsg.Header["Micro-Tunnel-Id"] = msg.id
// set the session id
nmsg.Header["Micro-Tunnel-Session"] = msg.session
// send the message via the interface
t.RLock()
for _, link := range t.links {
link.Send(nmsg)
}
t.RUnlock()
case <-t.closed:
return
}
}
}
// Address returns tunnel listen address
func (t *tun) Address() string {
return t.options.Address
// process incoming messages
func (t *tun) listen(link transport.Socket, listener bool) {
for {
// process anything via the net interface
msg := new(transport.Message)
err := link.Recv(msg)
if err != nil {
return
}
// first check Micro-Tunnel
switch msg.Header["Micro-Tunnel"] {
case "connect":
// assuming new connection
// TODO: do something with this
continue
case "close":
// assuming connection closed
// TODO: do something with this
continue
}
// the tunnel id
id := msg.Header["Micro-Tunnel-Id"]
// the session id
session := msg.Header["Micro-Tunnel-Session"]
// if the session id is blank there's nothing we can do
// TODO: check this is the case, is there any reason
// why we'd have a blank session? Is the tunnel
// used for some other purpose?
if len(id) == 0 || len(session) == 0 {
continue
}
var s *socket
var exists bool
// if its a local listener then we use that as the session id
// e.g we're using a loopback connecting to ourselves
if listener {
s, exists = t.getSocket(id, "listener")
} else {
// get the socket based on the tunnel id and session
// this could be something we dialed in which case
// we have a session for it otherwise its a listener
s, exists = t.getSocket(id, session)
if !exists {
// try get it based on just the tunnel id
// the assumption here is that a listener
// has no session but its set a listener session
s, exists = t.getSocket(id, "listener")
}
}
// no socket in existence
if !exists {
// drop it, we don't care about
// messages we don't know about
continue
}
// is the socket closed?
select {
case <-s.closed:
// closed
delete(t.sockets, id)
continue
default:
// process
}
// is the socket new?
select {
// if its new the socket is actually blocked waiting
// for a connection. so we check if its waiting.
case <-s.wait:
// if its waiting e.g its new then we close it
default:
// set remote address of the socket
s.remote = msg.Header["Remote"]
close(s.wait)
}
// construct a new transport message
tmsg := &transport.Message{
Header: msg.Header,
Body: msg.Body,
}
// construct the internal message
imsg := &message{
id: id,
session: session,
data: tmsg,
}
// append to recv backlog
// we don't block if we can't pass it on
select {
case s.recv <- imsg:
default:
}
}
}
// Transport returns tunnel client transport
func (t *tun) Transport() transport.Transport {
return t.tr
}
func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address)
if err != nil {
return err
}
// save the listener
t.listener = l
go func() {
// accept inbound connections
err := l.Accept(func(sock transport.Socket) {
// save the link
id := uuid.New().String()
t.Lock()
t.links[id] = &link{
Socket: sock,
id: id,
}
t.Unlock()
// delete the link
defer func() {
t.Lock()
delete(t.links, id)
t.Unlock()
}()
// listen for inbound messages
t.listen(sock, true)
})
t.Lock()
defer t.Unlock()
// still connected but the tunnel died
if err != nil && t.connected {
log.Logf("Tunnel listener died: %v", err)
}
}()
for _, node := range t.options.Nodes {
c, err := t.options.Transport.Dial(node)
if err != nil {
log.Debugf("Tunnel failed to connect to %s: %v", node, err)
continue
}
err = c.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "connect",
},
})
if err != nil {
continue
}
// process incoming messages
go t.listen(c, false)
// save the link
id := uuid.New().String()
t.links[id] = &link{
Socket: c,
id: id,
}
}
// process outbound messages to be sent
// process sends to all links
go t.process()
// Connect connects establishes point to point tunnel
func (t *tun) Connect() error {
return nil
}
// Close closes the tunnel
func (t *tun) close() error {
// close all the links
for id, link := range t.links {
link.Send(&transport.Message{
Header: map[string]string{
"Micro-Tunnel": "close",
},
})
link.Close()
delete(t.links, id)
}
// close the listener
return t.listener.Close()
}
// Close the tunnel
func (t *tun) Close() error {
return nil
}
t.Lock()
defer t.Unlock()
if !t.connected {
return nil
}
// Status returns tunnel status
func (t *tun) Status() Status {
select {
case <-t.closed:
return Closed
return nil
default:
return Connected
// close all the sockets
for _, s := range t.sockets {
s.Close()
}
// close the connection
close(t.closed)
t.connected = false
// send a close message
// we don't close the link
// just the tunnel
return t.close()
}
return nil
}
func (t *tun) String() string {
return "micro"
// Connect the tunnel
func (t *tun) Connect() error {
t.Lock()
defer t.Unlock()
// already connected
if t.connected {
return nil
}
// send the connect message
if err := t.connect(); err != nil {
return err
}
// set as connected
t.connected = true
// create new close channel
t.closed = make(chan bool)
return nil
}
// Dial an address
func (t *tun) Dial(addr string) (Conn, error) {
c, ok := t.newSocket(addr, t.newSession())
if !ok {
return nil, errors.New("error dialing " + addr)
}
// set remote
c.remote = addr
// set local
c.local = "local"
return c, nil
}
// Accept a connection on the address
func (t *tun) Listen(addr string) (Listener, error) {
// create a new socket by hashing the address
c, ok := t.newSocket(addr, "listener")
if !ok {
return nil, errors.New("already listening on " + addr)
}
// set remote. it will be replaced by the first message received
c.remote = "remote"
// set local
c.local = addr
tl := &tunListener{
addr: addr,
// the accept channel
accept: make(chan *socket, 128),
// the channel to close
closed: make(chan bool),
// the connection
conn: c,
// the listener socket
socket: c,
}
// this kicks off the internal message processor
// for the listener so it can create pseudo sockets
// per session if they do not exist or pass messages
// to the existign sessions
go tl.process()
// return the listener
return tl, nil
}

101
tunnel/listener.go Normal file
View File

@ -0,0 +1,101 @@
package tunnel
import (
"io"
)
type tunListener struct {
// address of the listener
addr string
// the accept channel
accept chan *socket
// the channel to close
closed chan bool
// the connection
conn Conn
// the listener socket
socket *socket
}
func (t *tunListener) process() {
// our connection map for session
conns := make(map[string]*socket)
for {
select {
case <-t.closed:
return
// receive a new message
case m := <-t.socket.recv:
// get a socket
sock, ok := conns[m.session]
if !ok {
// create a new socket session
sock = &socket{
// our tunnel id
id: m.id,
// the session id
session: m.session,
// close chan
closed: make(chan bool),
// recv called by the acceptor
recv: make(chan *message, 128),
// use the internal send buffer
send: t.socket.send,
// wait
wait: make(chan bool),
}
// first message
sock.recv <- m
// save the socket
conns[m.session] = sock
// send to accept chan
select {
case <-t.closed:
return
case t.accept <- sock:
}
}
// send this to the accept chan
select {
case <-sock.closed:
delete(conns, m.session)
case sock.recv <- m:
}
}
}
}
func (t *tunListener) Addr() string {
return t.addr
}
func (t *tunListener) Close() error {
select {
case <-t.closed:
return nil
default:
close(t.closed)
}
return nil
}
// Everytime accept is called we essentially block till we get a new connection
func (t *tunListener) Accept() (Conn, error) {
select {
// if the socket is closed return
case <-t.closed:
return nil, io.EOF
// wait for a new connection
case c, ok := <-t.accept:
if !ok {
return nil, io.EOF
}
return c, nil
}
return nil, nil
}

View File

@ -3,6 +3,7 @@ package tunnel
import (
"github.com/google/uuid"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/transport/quic"
)
var (
@ -39,7 +40,7 @@ func Address(a string) Option {
}
// Nodes specify remote network nodes
func Nodes(n []string) Option {
func Nodes(n ...string) Option {
return func(o *Options) {
o.Nodes = n
}
@ -57,7 +58,6 @@ func DefaultOptions() Options {
return Options{
Id: uuid.New().String(),
Address: DefaultAddress,
Nodes: make([]string, 0),
Transport: transport.DefaultTransport,
Transport: quic.NewTransport(),
}
}

View File

@ -1,25 +1,90 @@
package tunnel
import "github.com/micro/go-micro/transport"
import (
"errors"
type tunSocket struct{}
"github.com/micro/go-micro/transport"
)
func (s *tunSocket) Recv(m *transport.Message) error {
// socket is our pseudo socket for transport.Socket
type socket struct {
// socket id based on Micro-Tunnel
id string
// the session id based on Micro.Tunnel-Session
session string
// closed
closed chan bool
// remote addr
remote string
// local addr
local string
// send chan
send chan *message
// recv chan
recv chan *message
// wait until we have a connection
wait chan bool
}
// message is sent over the send channel
type message struct {
// tunnel id
id string
// the session id
session string
// transport data
data *transport.Message
}
func (s *socket) Remote() string {
return s.remote
}
func (s *socket) Local() string {
return s.local
}
func (s *socket) Id() string {
return s.id
}
func (s *socket) Session() string {
return s.session
}
func (s *socket) Send(m *transport.Message) error {
select {
case <-s.closed:
return errors.New("socket is closed")
default:
// no op
}
// append to backlog
s.send <- &message{id: s.id, session: s.session, data: m}
return nil
}
func (s *tunSocket) Send(m *transport.Message) error {
func (s *socket) Recv(m *transport.Message) error {
select {
case <-s.closed:
return errors.New("socket is closed")
default:
// no op
}
// recv from backlog
msg := <-s.recv
// set message
*m = *msg.data
// return nil
return nil
}
func (s *tunSocket) Close() error {
func (s *socket) Close() error {
select {
case <-s.closed:
// no op
default:
close(s.closed)
}
return nil
}
func (s *tunSocket) Local() string {
return ""
}
func (s *tunSocket) Remote() string {
return ""
}

View File

@ -1,51 +0,0 @@
package tunnel
import "github.com/micro/go-micro/transport"
type tunTransport struct {
options transport.Options
}
type tunClient struct {
*tunSocket
options transport.DialOptions
}
type tunListener struct {
conn chan *tunSocket
}
func newTransport(opts ...transport.Option) transport.Transport {
var options transport.Options
for _, o := range opts {
o(&options)
}
return &tunTransport{
options: options,
}
}
func (t *tunTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&t.options)
}
return nil
}
func (t *tunTransport) Options() transport.Options {
return t.options
}
func (t *tunTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) {
return nil, nil
}
func (t *tunTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) {
return nil, nil
}
func (t *tunTransport) String() string {
return "micro"
}

View File

@ -1,39 +1,43 @@
// Package tunnel provides micro network tunnelling
// Package tunnel provides gre network tunnelling
package tunnel
import (
"github.com/micro/go-micro/transport"
)
// Status is tunnel status
type Status int
const (
// Connected means the tunnel is alive
Connected Status = iota
// Closed meands the tunnel has been disconnected
Closed
)
// Tunnel creates a p2p network tunnel.
// Tunnel creates a gre network tunnel on top of a link.
// It establishes multiple streams using the Micro-Tunnel-Id header
// and Micro-Tunnel-Session header. The tunnel id is a hash of
// the address being requested.
type Tunnel interface {
// Id returns tunnel id
Id() string
// Options returns the tunnel options
Options() Options
// Address returns tunnel address
Address() string
// Transport to use by tunne clients
Transport() transport.Transport
// Connect connects the tunnel
Connect() error
// Close closes the tunnel
Close() error
// Status returns tunnel status
Status() Status
// Dial an endpoint
Dial(addr string) (Conn, error)
// Accept connections
Listen(addr string) (Listener, error)
}
// NewTunnel creates a new tunnel on top of a link
// The listener provides similar constructs to the transport.Listener
type Listener interface {
Addr() string
Close() error
Accept() (Conn, error)
}
// Conn is a connection dialed or accepted which includes the tunnel id and session
type Conn interface {
// Specifies the tunnel id
Id() string
// The session
Session() string
// a transport socket
transport.Socket
}
// NewTunnel creates a new tunnel
func NewTunnel(opts ...Option) Tunnel {
return newTunnel(opts...)
}

73
tunnel/tunnel_test.go Normal file
View File

@ -0,0 +1,73 @@
package tunnel
import (
"testing"
"github.com/micro/go-micro/transport"
)
// testAccept will accept connections on the transport, create a new link and tunnel on top
func testAccept(t *testing.T, tun Tunnel, wait chan bool) {
// listen on some virtual address
tl, err := tun.Listen("test-tunnel")
if err != nil {
t.Fatal(err)
}
// accept a connection
c, err := tl.Accept()
if err != nil {
t.Fatal(err)
}
// get a message
for {
m := new(transport.Message)
if err := c.Recv(m); err != nil {
t.Fatal(err)
}
close(wait)
return
}
}
// testSend will create a new link to an address and then a tunnel on top
func testSend(t *testing.T, tun Tunnel) {
// dial a new session
c, err := tun.Dial("test-tunnel")
if err != nil {
t.Fatal(err)
}
//defer c.Close()
m := transport.Message{
Header: map[string]string{
"test": "header",
},
}
if err := c.Send(&m); err != nil {
t.Fatal(err)
}
}
func TestTunnel(t *testing.T) {
// create a new listener
tun := NewTunnel(Nodes(":9096"))
err := tun.Connect()
if err != nil {
t.Fatal(err)
}
//defer tun.Close()
wait := make(chan bool)
// start accepting connections
go testAccept(t, tun, wait)
// send a message
testSend(t, tun)
// wait until message is received
<-wait
}