Move tunnel to its own package (#1921)

This commit is contained in:
Asim Aslam 2020-08-10 17:31:21 +01:00 committed by GitHub
parent 13f495587e
commit 4db8ea8f6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 72 additions and 68 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/router"
regRouter "github.com/micro/go-micro/v3/router/registry" regRouter "github.com/micro/go-micro/v3/router/registry"
"github.com/micro/go-micro/v3/tunnel" "github.com/micro/go-micro/v3/tunnel"
tmucp "github.com/micro/go-micro/v3/tunnel/mucp"
) )
type Option func(*Options) type Option func(*Options)
@ -104,7 +105,7 @@ func DefaultOptions() Options {
Id: uuid.New().String(), Id: uuid.New().String(),
Name: "go.micro", Name: "go.micro",
Address: ":0", Address: ":0",
Tunnel: tunnel.NewTunnel(), Tunnel: tmucp.NewTunnel(),
Router: regRouter.NewRouter(), Router: regRouter.NewRouter(),
Proxy: mucp.NewProxy(), Proxy: mucp.NewProxy(),
Resolver: new(noop.Resolver), Resolver: new(noop.Resolver),

View File

@ -7,6 +7,7 @@ import (
"github.com/micro/go-micro/v3/broker" "github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/tunnel" "github.com/micro/go-micro/v3/tunnel"
"github.com/micro/go-micro/v3/tunnel/mucp"
) )
type tunBroker struct { type tunBroker struct {
@ -176,7 +177,7 @@ func NewBroker(opts ...broker.Option) broker.Broker {
} }
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel) t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
if !ok { if !ok {
t = tunnel.NewTunnel() t = mucp.NewTunnel()
} }
a, ok := options.Context.Value(tunnelAddr{}).(string) a, ok := options.Context.Value(tunnelAddr{}).(string)

View File

@ -1,4 +1,4 @@
package tunnel package mucp
import ( import (
"crypto/aes" "crypto/aes"
@ -6,6 +6,7 @@ import (
"crypto/rand" "crypto/rand"
"crypto/sha256" "crypto/sha256"
"github.com/micro/go-micro/v3/tunnel"
"github.com/oxtoacart/bpool" "github.com/oxtoacart/bpool"
) )
@ -68,7 +69,7 @@ func Decrypt(gcm cipher.AEAD, data []byte) ([]byte, error) {
nonceSize := gcm.NonceSize() nonceSize := gcm.NonceSize()
if len(data) < nonceSize { if len(data) < nonceSize {
return nil, ErrDecryptingData return nil, tunnel.ErrDecryptingData
} }
// NOTE: we need to parse out nonce from the payload // NOTE: we need to parse out nonce from the payload

View File

@ -1,4 +1,4 @@
package tunnel package mucp
import ( import (
"bytes" "bytes"

View File

@ -1,4 +1,4 @@
package tunnel package mucp
import ( import (
"bytes" "bytes"

View File

@ -1,10 +1,11 @@
package tunnel package mucp
import ( import (
"io" "io"
"sync" "sync"
"github.com/micro/go-micro/v3/logger" "github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/tunnel"
) )
type tunListener struct { type tunListener struct {
@ -53,10 +54,10 @@ func (t *tunListener) process() {
var linkId string var linkId string
switch t.session.mode { switch t.session.mode {
case Multicast: case tunnel.Multicast:
sessionId = "multicast" sessionId = "multicast"
linkId = "multicast" linkId = "multicast"
case Broadcast: case tunnel.Broadcast:
sessionId = "broadcast" sessionId = "broadcast"
linkId = "broadcast" linkId = "broadcast"
default: default:
@ -123,7 +124,7 @@ func (t *tunListener) process() {
switch m.typ { switch m.typ {
case "close": case "close":
// don't close multicast sessions // don't close multicast sessions
if sess.mode > Unicast { if sess.mode > tunnel.Unicast {
continue continue
} }
@ -184,7 +185,7 @@ func (t *tunListener) Close() error {
} }
// Everytime accept is called we essentially block till we get a new connection // Everytime accept is called we essentially block till we get a new connection
func (t *tunListener) Accept() (Session, error) { func (t *tunListener) Accept() (tunnel.Session, error) {
select { select {
// if the session is closed return // if the session is closed return
case <-t.closed: case <-t.closed:
@ -199,7 +200,7 @@ func (t *tunListener) Accept() (Session, error) {
return nil, io.EOF return nil, io.EOF
} }
// return without accept // return without accept
if c.mode != Unicast { if c.mode != tunnel.Unicast {
return c, nil return c, nil
} }
// send back the accept // send back the accept

View File

@ -1,4 +1,4 @@
package tunnel package mucp
import ( import (
"errors" "errors"
@ -10,6 +10,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/v3/logger" "github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/tunnel"
) )
var ( var (
@ -26,7 +27,7 @@ var (
// tun represents a network tunnel // tun represents a network tunnel
type tun struct { type tun struct {
options Options options tunnel.Options
sync.RWMutex sync.RWMutex
@ -56,9 +57,9 @@ type tun struct {
} }
// create new tunnel on top of a link // create new tunnel on top of a link
func newTunnel(opts ...Option) *tun { func NewTunnel(opts ...tunnel.Option) *tun {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
options := DefaultOptions() options := tunnel.DefaultOptions()
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@ -75,7 +76,7 @@ func newTunnel(opts ...Option) *tun {
} }
// Init initializes tunnel options // Init initializes tunnel options
func (t *tun) Init(opts ...Option) error { func (t *tun) Init(opts ...tunnel.Option) error {
t.Lock() t.Lock()
for _, o := range opts { for _, o := range opts {
o(&t.options) o(&t.options)
@ -410,7 +411,7 @@ func (t *tun) process() {
if logger.V(logger.DebugLevel, log) { if logger.V(logger.DebugLevel, log) {
log.Debugf("Link for node %s not connected", id) log.Debugf("Link for node %s not connected", id)
} }
err = ErrLinkDisconnected err = tunnel.ErrLinkDisconnected
continue continue
} }
@ -418,19 +419,19 @@ func (t *tun) process() {
// and the message is being sent outbound via // and the message is being sent outbound via
// a dialled connection don't use this link // a dialled connection don't use this link
if loopback && msg.outbound { if loopback && msg.outbound {
err = ErrLinkLoopback err = tunnel.ErrLinkLoopback
continue continue
} }
// if the message was being returned by the loopback listener // if the message was being returned by the loopback listener
// send it back up the loopback link only // send it back up the loopback link only
if msg.loopback && !loopback { if msg.loopback && !loopback {
err = ErrLinkRemote err = tunnel.ErrLinkRemote
continue continue
} }
// check the multicast mappings // check the multicast mappings
if msg.mode == Multicast { if msg.mode == tunnel.Multicast {
// channel mapping not found in link // channel mapping not found in link
if !exists { if !exists {
continue continue
@ -440,7 +441,7 @@ func (t *tun) process() {
// this is where we explicitly set the link // this is where we explicitly set the link
// in a message received via the listen method // in a message received via the listen method
if len(msg.link) > 0 && id != msg.link { if len(msg.link) > 0 && id != msg.link {
err = ErrLinkNotFound err = tunnel.ErrLinkNotFound
continue continue
} }
} }
@ -527,7 +528,7 @@ func (t *tun) sendTo(links []*link, msg *message) error {
} }
// blast it in a go routine since its multicast/broadcast // blast it in a go routine since its multicast/broadcast
if msg.mode > Unicast { if msg.mode > tunnel.Unicast {
// make a copy // make a copy
m := &transport.Message{ m := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
@ -705,7 +706,7 @@ func (t *tun) listen(link *link) {
if exists && !loopback { if exists && !loopback {
// only delete the session if its unicast // only delete the session if its unicast
// otherwise ignore close on the multicast // otherwise ignore close on the multicast
if s.mode == Unicast { if s.mode == tunnel.Unicast {
// only delete this if its unicast // only delete this if its unicast
// but not if its a loopback conn // but not if its a loopback conn
t.delSession(channel, sessionId) t.delSession(channel, sessionId)
@ -730,7 +731,7 @@ func (t *tun) listen(link *link) {
case "accept": case "accept":
s, exists := t.getSession(channel, sessionId) s, exists := t.getSession(channel, sessionId)
// just set accepted on anything not unicast // just set accepted on anything not unicast
if exists && s.mode > Unicast { if exists && s.mode > tunnel.Unicast {
s.accepted = true s.accepted = true
continue continue
} }
@ -1166,10 +1167,10 @@ func (t *tun) Close() error {
} }
// Dial an address // Dial an address
func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { func (t *tun) Dial(channel string, opts ...tunnel.DialOption) (tunnel.Session, error) {
// get the options // get the options
options := DialOptions{ options := tunnel.DialOptions{
Timeout: DefaultDialTimeout, Timeout: tunnel.DefaultDialTimeout,
Wait: true, Wait: true,
} }
@ -1239,9 +1240,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// delete session and return error // delete session and return error
t.delSession(c.channel, c.session) t.delSession(c.channel, c.session)
if logger.V(logger.DebugLevel, log) { if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound) log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, tunnel.ErrLinkNotFound)
} }
return nil, ErrLinkNotFound return nil, tunnel.ErrLinkNotFound
} }
// assume discovered because we picked // assume discovered because we picked
@ -1256,7 +1257,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
// discovered so set the link if not multicast // discovered so set the link if not multicast
if c.discovered && c.mode == Unicast { if c.discovered && c.mode == tunnel.Unicast {
// pick a link if not specified // pick a link if not specified
if len(c.link) == 0 { if len(c.link) == 0 {
// pickLink will pick the best link // pickLink will pick the best link
@ -1300,7 +1301,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// return early if its not unicast // return early if its not unicast
// we will not wait for "open" for multicast // we will not wait for "open" for multicast
// and we will not wait it told not to // and we will not wait it told not to
if c.mode != Unicast || !options.Wait { if c.mode != tunnel.Unicast || !options.Wait {
return c, nil return c, nil
} }
@ -1341,11 +1342,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
// Accept a connection on the address // Accept a connection on the address
func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) { func (t *tun) Listen(channel string, opts ...tunnel.ListenOption) (tunnel.Listener, error) {
if logger.V(logger.DebugLevel, log) { if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel listening on %s", channel) log.Debugf("Tunnel listening on %s", channel)
} }
options := ListenOptions{ options := tunnel.ListenOptions{
// Read timeout defaults to never // Read timeout defaults to never
Timeout: time.Duration(-1), Timeout: time.Duration(-1),
} }
@ -1405,9 +1406,9 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
return tl, nil return tl, nil
} }
func (t *tun) Links() []Link { func (t *tun) Links() []tunnel.Link {
t.RLock() t.RLock()
links := make([]Link, 0, len(t.links)) links := make([]tunnel.Link, 0, len(t.links))
for _, link := range t.links { for _, link := range t.links {
links = append(links, link) links = append(links, link)

View File

@ -1,4 +1,4 @@
package tunnel package mucp
import ( import (
"os" "os"
@ -7,9 +7,10 @@ import (
"time" "time"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/tunnel"
) )
func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { func testBrokenTunAccept(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
// listen on some virtual address // listen on some virtual address
@ -52,7 +53,7 @@ func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.Wait
wait <- true wait <- true
} }
func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) { func testBrokenTunSend(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) {
defer wg.Done() defer wg.Done()
// wait for the listener to get ready // wait for the listener to get ready
@ -94,7 +95,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr
} }
// testAccept will accept connections on the transport, create a new link and tunnel on top // testAccept will accept connections on the transport, create a new link and tunnel on top
func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { func testAccept(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
// listen on some virtual address // listen on some virtual address
@ -135,7 +136,7 @@ func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
} }
// testSend will create a new link to an address and then a tunnel on top // testSend will create a new link to an address and then a tunnel on top
func testSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { func testSend(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
// wait for the listener to get ready // wait for the listener to get ready
@ -175,13 +176,13 @@ func testSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
func TestTunnel(t *testing.T) { func TestTunnel(t *testing.T) {
// create a new tunnel client // create a new tunnel client
tunA := NewTunnel( tunA := NewTunnel(
Address("127.0.0.1:9096"), tunnel.Address("127.0.0.1:9096"),
Nodes("127.0.0.1:9097"), tunnel.Nodes("127.0.0.1:9097"),
) )
// create a new tunnel server // create a new tunnel server
tunB := NewTunnel( tunB := NewTunnel(
Address("127.0.0.1:9097"), tunnel.Address("127.0.0.1:9097"),
) )
// start tunB // start tunB
@ -217,8 +218,8 @@ func TestTunnel(t *testing.T) {
func TestLoopbackTunnel(t *testing.T) { func TestLoopbackTunnel(t *testing.T) {
// create a new tunnel // create a new tunnel
tun := NewTunnel( tun := NewTunnel(
Address("127.0.0.1:9096"), tunnel.Address("127.0.0.1:9096"),
Nodes("127.0.0.1:9096"), tunnel.Nodes("127.0.0.1:9096"),
) )
// start tunnel // start tunnel
@ -249,13 +250,13 @@ func TestLoopbackTunnel(t *testing.T) {
func TestTunnelRTTRate(t *testing.T) { func TestTunnelRTTRate(t *testing.T) {
// create a new tunnel client // create a new tunnel client
tunA := NewTunnel( tunA := NewTunnel(
Address("127.0.0.1:9096"), tunnel.Address("127.0.0.1:9096"),
Nodes("127.0.0.1:9097"), tunnel.Nodes("127.0.0.1:9097"),
) )
// create a new tunnel server // create a new tunnel server
tunB := NewTunnel( tunB := NewTunnel(
Address("127.0.0.1:9097"), tunnel.Address("127.0.0.1:9097"),
) )
// start tunB // start tunB
@ -306,13 +307,13 @@ func TestReconnectTunnel(t *testing.T) {
// create a new tunnel client // create a new tunnel client
tunA := NewTunnel( tunA := NewTunnel(
Address("127.0.0.1:9098"), tunnel.Address("127.0.0.1:9098"),
Nodes("127.0.0.1:9099"), tunnel.Nodes("127.0.0.1:9099"),
) )
// create a new tunnel server // create a new tunnel server
tunB := NewTunnel( tunB := NewTunnel(
Address("127.0.0.1:9099"), tunnel.Address("127.0.0.1:9099"),
) )
// start tunnel // start tunnel

View File

@ -1,4 +1,4 @@
package tunnel package mucp
import ( import (
"crypto/cipher" "crypto/cipher"
@ -9,6 +9,7 @@ import (
"github.com/micro/go-micro/v3/logger" "github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/tunnel"
) )
// session is our pseudo session for transport.Socket // session is our pseudo session for transport.Socket
@ -40,7 +41,7 @@ type session struct {
// lookback marks the session as a loopback on the inbound // lookback marks the session as a loopback on the inbound
loopback bool loopback bool
// mode of the connection // mode of the connection
mode Mode mode tunnel.Mode
// the dial timeout // the dial timeout
dialTimeout time.Duration dialTimeout time.Duration
// the read timeout // the read timeout
@ -71,7 +72,7 @@ type message struct {
// loopback marks the message intended for loopback // loopback marks the message intended for loopback
loopback bool loopback bool
// mode of the connection // mode of the connection
mode Mode mode tunnel.Mode
// the link to send the message on // the link to send the message on
link string link string
// transport data // transport data
@ -180,7 +181,7 @@ func (s *session) waitFor(msgType string, timeout time.Duration) (*message, erro
// got the message // got the message
return msg, nil return msg, nil
case <-after(timeout): case <-after(timeout):
return nil, ErrReadTimeout return nil, tunnel.ErrReadTimeout
case <-s.closed: case <-s.closed:
// check pending message queue // check pending message queue
select { select {
@ -214,14 +215,14 @@ func (s *session) Discover() error {
// create a new discovery message for this channel // create a new discovery message for this channel
msg := s.newMessage("discover") msg := s.newMessage("discover")
// broadcast the message to all links // broadcast the message to all links
msg.mode = Broadcast msg.mode = tunnel.Broadcast
// its an outbound connection since we're dialling // its an outbound connection since we're dialling
msg.outbound = true msg.outbound = true
// don't set the link since we don't know where it is // don't set the link since we don't know where it is
msg.link = "" msg.link = ""
// if multicast then set that as session // if multicast then set that as session
if s.mode == Multicast { if s.mode == tunnel.Multicast {
msg.session = "multicast" msg.session = "multicast"
} }
@ -249,7 +250,7 @@ func (s *session) Discover() error {
// wait to hear back about the sent message // wait to hear back about the sent message
select { select {
case <-time.After(after()): case <-time.After(after()):
return ErrDialTimeout return tunnel.ErrDialTimeout
case err := <-s.errChan: case err := <-s.errChan:
if err != nil { if err != nil {
return err return err
@ -258,7 +259,7 @@ func (s *session) Discover() error {
// bail early if its not unicast // bail early if its not unicast
// we don't need to wait for the announce // we don't need to wait for the announce
if s.mode != Unicast { if s.mode != tunnel.Unicast {
s.discovered = true s.discovered = true
s.accepted = true s.accepted = true
return nil return nil
@ -326,7 +327,7 @@ func (s *session) Announce() error {
// we don't need an error back // we don't need an error back
msg.errChan = nil msg.errChan = nil
// announce to all // announce to all
msg.mode = Broadcast msg.mode = tunnel.Broadcast
// we don't need the link // we don't need the link
msg.link = "" msg.link = ""
@ -382,7 +383,7 @@ func (s *session) Send(m *transport.Message) error {
msg.data = data msg.data = data
// if multicast don't set the link // if multicast don't set the link
if s.mode != Unicast { if s.mode != tunnel.Unicast {
msg.link = "" msg.link = ""
} }
@ -482,7 +483,7 @@ func (s *session) Close() error {
close(s.closed) close(s.closed)
// don't send close on multicast or broadcast // don't send close on multicast or broadcast
if s.mode != Unicast { if s.mode != tunnel.Unicast {
return nil return nil
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
"github.com/micro/go-micro/v3/tunnel" "github.com/micro/go-micro/v3/tunnel"
"github.com/micro/go-micro/v3/tunnel/mucp"
) )
type tunTransport struct { type tunTransport struct {
@ -31,7 +32,7 @@ func (t *tunTransport) Init(opts ...transport.Option) error {
// get the tunnel // get the tunnel
tun, ok := t.options.Context.Value(tunnelKey{}).(tunnel.Tunnel) tun, ok := t.options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
if !ok { if !ok {
tun = tunnel.NewTunnel() tun = mucp.NewTunnel()
} }
// get the transport // get the transport

View File

@ -101,7 +101,3 @@ type Session interface {
transport.Socket transport.Socket
} }
// NewTunnel creates a new tunnel
func NewTunnel(opts ...Option) Tunnel {
return newTunnel(opts...)
}