remove the tunnel
This commit is contained in:
@@ -1,229 +0,0 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/network/transport"
|
||||
)
|
||||
|
||||
// tun represents a network tunnel
|
||||
type tun struct {
|
||||
// interface to use
|
||||
net Interface
|
||||
|
||||
// connect
|
||||
mtx sync.RWMutex
|
||||
connected bool
|
||||
|
||||
// the send channel
|
||||
send chan *message
|
||||
// close channel
|
||||
closed chan bool
|
||||
|
||||
// sockets
|
||||
sockets map[string]*socket
|
||||
}
|
||||
|
||||
// create new tunnel
|
||||
func newTunnel(net Interface) *tun {
|
||||
return &tun{
|
||||
net: net,
|
||||
send: make(chan *message, 128),
|
||||
closed: make(chan bool),
|
||||
sockets: make(map[string]*socket),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tun) getSocket(id string) (*socket, bool) {
|
||||
// get the socket
|
||||
t.mtx.RLock()
|
||||
s, ok := t.sockets[id]
|
||||
t.mtx.RUnlock()
|
||||
return s, ok
|
||||
}
|
||||
|
||||
func (t *tun) newSocket(id string) *socket {
|
||||
// new id if it doesn't exist
|
||||
if len(id) == 0 {
|
||||
id = uuid.New().String()
|
||||
}
|
||||
|
||||
// hash the id
|
||||
h := sha256.New()
|
||||
h.Write([]byte(id))
|
||||
id = fmt.Sprintf("%x", h.Sum(nil))
|
||||
|
||||
// new socket
|
||||
s := &socket{
|
||||
id: id,
|
||||
closed: make(chan bool),
|
||||
recv: make(chan *message, 128),
|
||||
send: t.send,
|
||||
}
|
||||
|
||||
// save socket
|
||||
t.mtx.Lock()
|
||||
t.sockets[id] = s
|
||||
t.mtx.Unlock()
|
||||
|
||||
// return socket
|
||||
return s
|
||||
}
|
||||
|
||||
// process outgoing messages
|
||||
func (t *tun) process() {
|
||||
// manage the send buffer
|
||||
// all pseudo sockets throw everything down this
|
||||
for {
|
||||
select {
|
||||
case msg := <-t.send:
|
||||
nmsg := &Message{
|
||||
Header: msg.data.Header,
|
||||
Body: msg.data.Body,
|
||||
}
|
||||
|
||||
// set the stream id on the outgoing message
|
||||
nmsg.Header["Micro-Stream"] = msg.id
|
||||
|
||||
// send the message via the interface
|
||||
if err := t.net.Send(nmsg); err != nil {
|
||||
// no op
|
||||
// TODO: do something
|
||||
}
|
||||
case <-t.closed:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// process incoming messages
|
||||
func (t *tun) listen() {
|
||||
for {
|
||||
// process anything via the net interface
|
||||
msg, err := t.net.Recv()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// a stream id
|
||||
id := msg.Header["Micro-Stream"]
|
||||
|
||||
// get the socket
|
||||
s, exists := t.getSocket(id)
|
||||
if !exists {
|
||||
// no op
|
||||
continue
|
||||
}
|
||||
|
||||
// is the socket closed?
|
||||
select {
|
||||
case <-s.closed:
|
||||
// closed
|
||||
delete(t.sockets, id)
|
||||
continue
|
||||
default:
|
||||
// process
|
||||
}
|
||||
|
||||
// is the socket new?
|
||||
select {
|
||||
// if its new it will block here
|
||||
case <-s.wait:
|
||||
// its not new
|
||||
default:
|
||||
// its new
|
||||
// set remote address of the socket
|
||||
s.remote = msg.Header["Remote"]
|
||||
close(s.wait)
|
||||
}
|
||||
|
||||
tmsg := &transport.Message{
|
||||
Header: msg.Header,
|
||||
Body: msg.Body,
|
||||
}
|
||||
|
||||
// TODO: don't block on queuing
|
||||
// append to recv backlog
|
||||
s.recv <- &message{id: id, data: tmsg}
|
||||
}
|
||||
}
|
||||
|
||||
// Close the tunnel
|
||||
func (t *tun) Close() error {
|
||||
t.mtx.Lock()
|
||||
defer t.mtx.Unlock()
|
||||
|
||||
if !t.connected {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-t.closed:
|
||||
return nil
|
||||
default:
|
||||
// close all the sockets
|
||||
for _, s := range t.sockets {
|
||||
s.Close()
|
||||
}
|
||||
// close the connection
|
||||
close(t.closed)
|
||||
t.connected = false
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connect the tunnel
|
||||
func (t *tun) Connect() error {
|
||||
t.mtx.Lock()
|
||||
defer t.mtx.Unlock()
|
||||
|
||||
// already connected
|
||||
if t.connected {
|
||||
return nil
|
||||
}
|
||||
|
||||
// set as connected
|
||||
t.connected = true
|
||||
// create new close channel
|
||||
t.closed = make(chan bool)
|
||||
|
||||
// process messages to be sent
|
||||
go t.process()
|
||||
// process incoming messages
|
||||
go t.listen()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dial an address
|
||||
func (t *tun) Dial(addr string) (Conn, error) {
|
||||
c := t.newSocket(addr)
|
||||
// set remote
|
||||
c.remote = addr
|
||||
// set local
|
||||
c.local = t.net.Addr()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (t *tun) Accept(addr string) (Conn, error) {
|
||||
c := t.newSocket(addr)
|
||||
// set remote
|
||||
c.remote = t.net.Addr()
|
||||
// set local
|
||||
c.local = addr
|
||||
|
||||
select {
|
||||
case <-c.closed:
|
||||
return nil, errors.New("error creating socket")
|
||||
// wait for the first message
|
||||
case <-c.wait:
|
||||
}
|
||||
|
||||
// return socket
|
||||
return c, nil
|
||||
}
|
||||
Reference in New Issue
Block a user