From 66c2519696832b5dcdb223b6d36df6502874bc80 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 10 Jul 2019 17:36:04 +0100 Subject: [PATCH] Add Tunnel: an interface for stream duplexing over a link --- network/tunnel/default.go | 232 ++++++++++++++++++++++++++++++++++++++ network/tunnel/socket.go | 82 ++++++++++++++ network/tunnel/tunnel.go | 31 +++++ 3 files changed, 345 insertions(+) create mode 100644 network/tunnel/default.go create mode 100644 network/tunnel/socket.go create mode 100644 network/tunnel/tunnel.go diff --git a/network/tunnel/default.go b/network/tunnel/default.go new file mode 100644 index 00000000..c9f274f0 --- /dev/null +++ b/network/tunnel/default.go @@ -0,0 +1,232 @@ +package tunnel + +import ( + "crypto/sha256" + "errors" + "fmt" + "sync" + + "github.com/google/uuid" + "github.com/micro/go-micro/network/link" + "github.com/micro/go-micro/transport" +) + +// tun represents a network tunnel +type tun struct { + link link.Link + + sync.RWMutex + + // connect + connected bool + + // the send channel + send chan *message + // close channel + closed chan bool + + // sockets + sockets map[string]*socket +} + +// create new tunnel +func newTunnel(link link.Link) *tun { + return &tun{ + link: link, + send: make(chan *message, 128), + closed: make(chan bool), + sockets: make(map[string]*socket), + } +} + +func (t *tun) getSocket(id string) (*socket, bool) { + // get the socket + t.RLock() + s, ok := t.sockets[id] + t.RUnlock() + return s, ok +} + +func (t *tun) newSocket(id string) *socket { + // new id if it doesn't exist + if len(id) == 0 { + id = uuid.New().String() + } + + // hash the id + h := sha256.New() + h.Write([]byte(id)) + id = fmt.Sprintf("%x", h.Sum(nil)) + + // new socket + s := &socket{ + id: id, + closed: make(chan bool), + recv: make(chan *message, 128), + send: t.send, + } + + // save socket + t.Lock() + t.sockets[id] = s + t.Unlock() + + // return socket + return s +} + +// process outgoing messages +func (t *tun) process() { + // manage the send buffer + // all pseudo sockets throw everything down this + for { + select { + case msg := <-t.send: + nmsg := &transport.Message{ + Header: msg.data.Header, + Body: msg.data.Body, + } + + // set the stream id on the outgoing message + nmsg.Header["Micro-Tunnel"] = msg.id + + // send the message via the interface + if err := t.link.Send(nmsg); err != nil { + // no op + // TODO: do something + } + case <-t.closed: + return + } + } +} + +// process incoming messages +func (t *tun) listen() { + for { + // process anything via the net interface + msg := new(transport.Message) + err := t.link.Recv(msg) + if err != nil { + return + } + + // a stream id + id := msg.Header["Micro-Tunnel"] + + // get the socket + s, exists := t.getSocket(id) + if !exists { + // no op + continue + } + + // is the socket closed? + select { + case <-s.closed: + // closed + delete(t.sockets, id) + continue + default: + // process + } + + // is the socket new? + select { + // if its new it will block here + case <-s.wait: + // its not new + default: + // its new + // set remote address of the socket + s.remote = msg.Header["Remote"] + close(s.wait) + } + + tmsg := &transport.Message{ + Header: msg.Header, + Body: msg.Body, + } + + // TODO: don't block on queuing + // append to recv backlog + s.recv <- &message{id: id, data: tmsg} + } +} + +// Close the tunnel +func (t *tun) Close() error { + t.Lock() + defer t.Unlock() + + if !t.connected { + return nil + } + + select { + case <-t.closed: + return nil + default: + // close all the sockets + for _, s := range t.sockets { + s.Close() + } + // close the connection + close(t.closed) + t.connected = false + } + + return nil +} + +// Connect the tunnel +func (t *tun) Connect() error { + t.Lock() + defer t.Unlock() + + // already connected + if t.connected { + return nil + } + + // set as connected + t.connected = true + // create new close channel + t.closed = make(chan bool) + + // process messages to be sent + go t.process() + // process incoming messages + go t.listen() + + return nil +} + +// Dial an address +func (t *tun) Dial(addr string) (Conn, error) { + c := t.newSocket(addr) + // set remote + c.remote = addr + // set local + c.local = t.link.Local() + return c, nil +} + +func (t *tun) Accept(addr string) (Conn, error) { + // create a new socket by hashing the address + c := t.newSocket(addr) + // set remote. it will be replaced by the first message received + c.remote = t.link.Remote() + // set local + c.local = addr + + select { + case <-c.closed: + return nil, errors.New("error creating socket") + // wait for the first message + case <-c.wait: + } + + // return socket + return c, nil +} diff --git a/network/tunnel/socket.go b/network/tunnel/socket.go new file mode 100644 index 00000000..861e3f4c --- /dev/null +++ b/network/tunnel/socket.go @@ -0,0 +1,82 @@ +package tunnel + +import ( + "errors" + + "github.com/micro/go-micro/transport" +) + +// socket is our pseudo socket for transport.Socket +type socket struct { + // socket id based on Micro-Tunnel + id string + // closed + closed chan bool + // remote addr + remote string + // local addr + local string + // send chan + send chan *message + // recv chan + recv chan *message + // wait until we have a connection + wait chan bool +} + +// message is sent over the send channel +type message struct { + // socket id + id string + // transport data + data *transport.Message +} + +func (s *socket) Remote() string { + return s.remote +} + +func (s *socket) Local() string { + return s.local +} + +func (s *socket) Id() string { + return s.id +} + +func (s *socket) Send(m *transport.Message) error { + select { + case <-s.closed: + return errors.New("socket is closed") + default: + // no op + } + // append to backlog + s.send <- &message{id: s.id, data: m} + return nil +} + +func (s *socket) Recv(m *transport.Message) error { + select { + case <-s.closed: + return errors.New("socket is closed") + default: + // no op + } + // recv from backlog + msg := <-s.recv + // set message + *m = *msg.data + // return nil + return nil +} + +func (s *socket) Close() error { + select { + case <-s.closed: + // no op + default: + close(s.closed) + } + return nil +} diff --git a/network/tunnel/tunnel.go b/network/tunnel/tunnel.go new file mode 100644 index 00000000..837701e4 --- /dev/null +++ b/network/tunnel/tunnel.go @@ -0,0 +1,31 @@ +// Package tunnel provides a network tunnel +package tunnel + +import ( + "github.com/micro/go-micro/network/link" + "github.com/micro/go-micro/transport" +) + +// Tunnel creates a network tunnel +type Tunnel interface { + // Connect connects the tunnel + Connect() error + // Close closes the tunnel + Close() error + // Dial an endpoint + Dial(addr string) (Conn, error) + // Accept connections + Accept(addr string) (Conn, error) +} + +type Conn interface { + // Specifies the tunnel id + Id() string + // a transport socket + transport.Socket +} + +// NewTunnel creates a new tunnel on top of a link +func NewTunnel(l link.Link) Tunnel { + return newTunnel(l) +}