Add Tunnel: an interface for stream duplexing over a link
This commit is contained in:
parent
55f8045a70
commit
66c2519696
232
network/tunnel/default.go
Normal file
232
network/tunnel/default.go
Normal file
@ -0,0 +1,232 @@
|
|||||||
|
package tunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/micro/go-micro/network/link"
|
||||||
|
"github.com/micro/go-micro/transport"
|
||||||
|
)
|
||||||
|
|
||||||
|
// tun represents a network tunnel
|
||||||
|
type tun struct {
|
||||||
|
link link.Link
|
||||||
|
|
||||||
|
sync.RWMutex
|
||||||
|
|
||||||
|
// connect
|
||||||
|
connected bool
|
||||||
|
|
||||||
|
// the send channel
|
||||||
|
send chan *message
|
||||||
|
// close channel
|
||||||
|
closed chan bool
|
||||||
|
|
||||||
|
// sockets
|
||||||
|
sockets map[string]*socket
|
||||||
|
}
|
||||||
|
|
||||||
|
// create new tunnel
|
||||||
|
func newTunnel(link link.Link) *tun {
|
||||||
|
return &tun{
|
||||||
|
link: link,
|
||||||
|
send: make(chan *message, 128),
|
||||||
|
closed: make(chan bool),
|
||||||
|
sockets: make(map[string]*socket),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tun) getSocket(id string) (*socket, bool) {
|
||||||
|
// get the socket
|
||||||
|
t.RLock()
|
||||||
|
s, ok := t.sockets[id]
|
||||||
|
t.RUnlock()
|
||||||
|
return s, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tun) newSocket(id string) *socket {
|
||||||
|
// new id if it doesn't exist
|
||||||
|
if len(id) == 0 {
|
||||||
|
id = uuid.New().String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// hash the id
|
||||||
|
h := sha256.New()
|
||||||
|
h.Write([]byte(id))
|
||||||
|
id = fmt.Sprintf("%x", h.Sum(nil))
|
||||||
|
|
||||||
|
// new socket
|
||||||
|
s := &socket{
|
||||||
|
id: id,
|
||||||
|
closed: make(chan bool),
|
||||||
|
recv: make(chan *message, 128),
|
||||||
|
send: t.send,
|
||||||
|
}
|
||||||
|
|
||||||
|
// save socket
|
||||||
|
t.Lock()
|
||||||
|
t.sockets[id] = s
|
||||||
|
t.Unlock()
|
||||||
|
|
||||||
|
// return socket
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// process outgoing messages
|
||||||
|
func (t *tun) process() {
|
||||||
|
// manage the send buffer
|
||||||
|
// all pseudo sockets throw everything down this
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg := <-t.send:
|
||||||
|
nmsg := &transport.Message{
|
||||||
|
Header: msg.data.Header,
|
||||||
|
Body: msg.data.Body,
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the stream id on the outgoing message
|
||||||
|
nmsg.Header["Micro-Tunnel"] = msg.id
|
||||||
|
|
||||||
|
// send the message via the interface
|
||||||
|
if err := t.link.Send(nmsg); err != nil {
|
||||||
|
// no op
|
||||||
|
// TODO: do something
|
||||||
|
}
|
||||||
|
case <-t.closed:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// process incoming messages
|
||||||
|
func (t *tun) listen() {
|
||||||
|
for {
|
||||||
|
// process anything via the net interface
|
||||||
|
msg := new(transport.Message)
|
||||||
|
err := t.link.Recv(msg)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// a stream id
|
||||||
|
id := msg.Header["Micro-Tunnel"]
|
||||||
|
|
||||||
|
// get the socket
|
||||||
|
s, exists := t.getSocket(id)
|
||||||
|
if !exists {
|
||||||
|
// no op
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// is the socket closed?
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
// closed
|
||||||
|
delete(t.sockets, id)
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
// process
|
||||||
|
}
|
||||||
|
|
||||||
|
// is the socket new?
|
||||||
|
select {
|
||||||
|
// if its new it will block here
|
||||||
|
case <-s.wait:
|
||||||
|
// its not new
|
||||||
|
default:
|
||||||
|
// its new
|
||||||
|
// set remote address of the socket
|
||||||
|
s.remote = msg.Header["Remote"]
|
||||||
|
close(s.wait)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmsg := &transport.Message{
|
||||||
|
Header: msg.Header,
|
||||||
|
Body: msg.Body,
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: don't block on queuing
|
||||||
|
// append to recv backlog
|
||||||
|
s.recv <- &message{id: id, data: tmsg}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the tunnel
|
||||||
|
func (t *tun) Close() error {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
|
||||||
|
if !t.connected {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-t.closed:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
// close all the sockets
|
||||||
|
for _, s := range t.sockets {
|
||||||
|
s.Close()
|
||||||
|
}
|
||||||
|
// close the connection
|
||||||
|
close(t.closed)
|
||||||
|
t.connected = false
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect the tunnel
|
||||||
|
func (t *tun) Connect() error {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
|
||||||
|
// already connected
|
||||||
|
if t.connected {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// set as connected
|
||||||
|
t.connected = true
|
||||||
|
// create new close channel
|
||||||
|
t.closed = make(chan bool)
|
||||||
|
|
||||||
|
// process messages to be sent
|
||||||
|
go t.process()
|
||||||
|
// process incoming messages
|
||||||
|
go t.listen()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial an address
|
||||||
|
func (t *tun) Dial(addr string) (Conn, error) {
|
||||||
|
c := t.newSocket(addr)
|
||||||
|
// set remote
|
||||||
|
c.remote = addr
|
||||||
|
// set local
|
||||||
|
c.local = t.link.Local()
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tun) Accept(addr string) (Conn, error) {
|
||||||
|
// create a new socket by hashing the address
|
||||||
|
c := t.newSocket(addr)
|
||||||
|
// set remote. it will be replaced by the first message received
|
||||||
|
c.remote = t.link.Remote()
|
||||||
|
// set local
|
||||||
|
c.local = addr
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
return nil, errors.New("error creating socket")
|
||||||
|
// wait for the first message
|
||||||
|
case <-c.wait:
|
||||||
|
}
|
||||||
|
|
||||||
|
// return socket
|
||||||
|
return c, nil
|
||||||
|
}
|
82
network/tunnel/socket.go
Normal file
82
network/tunnel/socket.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
package tunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/transport"
|
||||||
|
)
|
||||||
|
|
||||||
|
// socket is our pseudo socket for transport.Socket
|
||||||
|
type socket struct {
|
||||||
|
// socket id based on Micro-Tunnel
|
||||||
|
id string
|
||||||
|
// closed
|
||||||
|
closed chan bool
|
||||||
|
// remote addr
|
||||||
|
remote string
|
||||||
|
// local addr
|
||||||
|
local string
|
||||||
|
// send chan
|
||||||
|
send chan *message
|
||||||
|
// recv chan
|
||||||
|
recv chan *message
|
||||||
|
// wait until we have a connection
|
||||||
|
wait chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// message is sent over the send channel
|
||||||
|
type message struct {
|
||||||
|
// socket id
|
||||||
|
id string
|
||||||
|
// transport data
|
||||||
|
data *transport.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *socket) Remote() string {
|
||||||
|
return s.remote
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *socket) Local() string {
|
||||||
|
return s.local
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *socket) Id() string {
|
||||||
|
return s.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *socket) Send(m *transport.Message) error {
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
return errors.New("socket is closed")
|
||||||
|
default:
|
||||||
|
// no op
|
||||||
|
}
|
||||||
|
// append to backlog
|
||||||
|
s.send <- &message{id: s.id, data: m}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *socket) Recv(m *transport.Message) error {
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
return errors.New("socket is closed")
|
||||||
|
default:
|
||||||
|
// no op
|
||||||
|
}
|
||||||
|
// recv from backlog
|
||||||
|
msg := <-s.recv
|
||||||
|
// set message
|
||||||
|
*m = *msg.data
|
||||||
|
// return nil
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *socket) Close() error {
|
||||||
|
select {
|
||||||
|
case <-s.closed:
|
||||||
|
// no op
|
||||||
|
default:
|
||||||
|
close(s.closed)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
31
network/tunnel/tunnel.go
Normal file
31
network/tunnel/tunnel.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
// Package tunnel provides a network tunnel
|
||||||
|
package tunnel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/micro/go-micro/network/link"
|
||||||
|
"github.com/micro/go-micro/transport"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Tunnel creates a network tunnel
|
||||||
|
type Tunnel interface {
|
||||||
|
// Connect connects the tunnel
|
||||||
|
Connect() error
|
||||||
|
// Close closes the tunnel
|
||||||
|
Close() error
|
||||||
|
// Dial an endpoint
|
||||||
|
Dial(addr string) (Conn, error)
|
||||||
|
// Accept connections
|
||||||
|
Accept(addr string) (Conn, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Conn interface {
|
||||||
|
// Specifies the tunnel id
|
||||||
|
Id() string
|
||||||
|
// a transport socket
|
||||||
|
transport.Socket
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTunnel creates a new tunnel on top of a link
|
||||||
|
func NewTunnel(l link.Link) Tunnel {
|
||||||
|
return newTunnel(l)
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user