diff --git a/network/options.go b/network/options.go index 7fcf6f6e..856f647b 100644 --- a/network/options.go +++ b/network/options.go @@ -9,6 +9,7 @@ import ( "github.com/micro/go-micro/v3/router" regRouter "github.com/micro/go-micro/v3/router/registry" "github.com/micro/go-micro/v3/tunnel" + tmucp "github.com/micro/go-micro/v3/tunnel/mucp" ) type Option func(*Options) @@ -104,7 +105,7 @@ func DefaultOptions() Options { Id: uuid.New().String(), Name: "go.micro", Address: ":0", - Tunnel: tunnel.NewTunnel(), + Tunnel: tmucp.NewTunnel(), Router: regRouter.NewRouter(), Proxy: mucp.NewProxy(), Resolver: new(noop.Resolver), diff --git a/tunnel/broker/broker.go b/tunnel/broker/broker.go index 49593845..7bfc0757 100644 --- a/tunnel/broker/broker.go +++ b/tunnel/broker/broker.go @@ -7,6 +7,7 @@ import ( "github.com/micro/go-micro/v3/broker" "github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/tunnel" + "github.com/micro/go-micro/v3/tunnel/mucp" ) type tunBroker struct { @@ -176,7 +177,7 @@ func NewBroker(opts ...broker.Option) broker.Broker { } t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel) if !ok { - t = tunnel.NewTunnel() + t = mucp.NewTunnel() } a, ok := options.Context.Value(tunnelAddr{}).(string) diff --git a/tunnel/crypto.go b/tunnel/mucp/crypto.go similarity index 95% rename from tunnel/crypto.go rename to tunnel/mucp/crypto.go index 9f70c444..534e96f2 100644 --- a/tunnel/crypto.go +++ b/tunnel/mucp/crypto.go @@ -1,4 +1,4 @@ -package tunnel +package mucp import ( "crypto/aes" @@ -6,6 +6,7 @@ import ( "crypto/rand" "crypto/sha256" + "github.com/micro/go-micro/v3/tunnel" "github.com/oxtoacart/bpool" ) @@ -68,7 +69,7 @@ func Decrypt(gcm cipher.AEAD, data []byte) ([]byte, error) { nonceSize := gcm.NonceSize() if len(data) < nonceSize { - return nil, ErrDecryptingData + return nil, tunnel.ErrDecryptingData } // NOTE: we need to parse out nonce from the payload diff --git a/tunnel/crypto_test.go b/tunnel/mucp/crypto_test.go similarity index 98% rename from tunnel/crypto_test.go rename to tunnel/mucp/crypto_test.go index af0e6a28..41c20fa6 100644 --- a/tunnel/crypto_test.go +++ b/tunnel/mucp/crypto_test.go @@ -1,4 +1,4 @@ -package tunnel +package mucp import ( "bytes" diff --git a/tunnel/link.go b/tunnel/mucp/link.go similarity index 99% rename from tunnel/link.go rename to tunnel/mucp/link.go index 0c615e22..aa34f085 100644 --- a/tunnel/link.go +++ b/tunnel/mucp/link.go @@ -1,4 +1,4 @@ -package tunnel +package mucp import ( "bytes" diff --git a/tunnel/listener.go b/tunnel/mucp/listener.go similarity index 94% rename from tunnel/listener.go rename to tunnel/mucp/listener.go index 0d93c5f1..c1d6a305 100644 --- a/tunnel/listener.go +++ b/tunnel/mucp/listener.go @@ -1,10 +1,11 @@ -package tunnel +package mucp import ( "io" "sync" "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/tunnel" ) type tunListener struct { @@ -53,10 +54,10 @@ func (t *tunListener) process() { var linkId string switch t.session.mode { - case Multicast: + case tunnel.Multicast: sessionId = "multicast" linkId = "multicast" - case Broadcast: + case tunnel.Broadcast: sessionId = "broadcast" linkId = "broadcast" default: @@ -123,7 +124,7 @@ func (t *tunListener) process() { switch m.typ { case "close": // don't close multicast sessions - if sess.mode > Unicast { + if sess.mode > tunnel.Unicast { continue } @@ -184,7 +185,7 @@ func (t *tunListener) Close() error { } // Everytime accept is called we essentially block till we get a new connection -func (t *tunListener) Accept() (Session, error) { +func (t *tunListener) Accept() (tunnel.Session, error) { select { // if the session is closed return case <-t.closed: @@ -199,7 +200,7 @@ func (t *tunListener) Accept() (Session, error) { return nil, io.EOF } // return without accept - if c.mode != Unicast { + if c.mode != tunnel.Unicast { return c, nil } // send back the accept diff --git a/tunnel/default.go b/tunnel/mucp/mucp.go similarity index 96% rename from tunnel/default.go rename to tunnel/mucp/mucp.go index fa16a07d..37329bd2 100644 --- a/tunnel/default.go +++ b/tunnel/mucp/mucp.go @@ -1,4 +1,4 @@ -package tunnel +package mucp import ( "errors" @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/micro/go-micro/v3/logger" "github.com/micro/go-micro/v3/transport" + "github.com/micro/go-micro/v3/tunnel" ) var ( @@ -26,7 +27,7 @@ var ( // tun represents a network tunnel type tun struct { - options Options + options tunnel.Options sync.RWMutex @@ -56,9 +57,9 @@ type tun struct { } // create new tunnel on top of a link -func newTunnel(opts ...Option) *tun { +func NewTunnel(opts ...tunnel.Option) *tun { rand.Seed(time.Now().UnixNano()) - options := DefaultOptions() + options := tunnel.DefaultOptions() for _, o := range opts { o(&options) } @@ -75,7 +76,7 @@ func newTunnel(opts ...Option) *tun { } // Init initializes tunnel options -func (t *tun) Init(opts ...Option) error { +func (t *tun) Init(opts ...tunnel.Option) error { t.Lock() for _, o := range opts { o(&t.options) @@ -410,7 +411,7 @@ func (t *tun) process() { if logger.V(logger.DebugLevel, log) { log.Debugf("Link for node %s not connected", id) } - err = ErrLinkDisconnected + err = tunnel.ErrLinkDisconnected continue } @@ -418,19 +419,19 @@ func (t *tun) process() { // and the message is being sent outbound via // a dialled connection don't use this link if loopback && msg.outbound { - err = ErrLinkLoopback + err = tunnel.ErrLinkLoopback continue } // if the message was being returned by the loopback listener // send it back up the loopback link only if msg.loopback && !loopback { - err = ErrLinkRemote + err = tunnel.ErrLinkRemote continue } // check the multicast mappings - if msg.mode == Multicast { + if msg.mode == tunnel.Multicast { // channel mapping not found in link if !exists { continue @@ -440,7 +441,7 @@ func (t *tun) process() { // this is where we explicitly set the link // in a message received via the listen method if len(msg.link) > 0 && id != msg.link { - err = ErrLinkNotFound + err = tunnel.ErrLinkNotFound continue } } @@ -527,7 +528,7 @@ func (t *tun) sendTo(links []*link, msg *message) error { } // blast it in a go routine since its multicast/broadcast - if msg.mode > Unicast { + if msg.mode > tunnel.Unicast { // make a copy m := &transport.Message{ Header: make(map[string]string), @@ -705,7 +706,7 @@ func (t *tun) listen(link *link) { if exists && !loopback { // only delete the session if its unicast // otherwise ignore close on the multicast - if s.mode == Unicast { + if s.mode == tunnel.Unicast { // only delete this if its unicast // but not if its a loopback conn t.delSession(channel, sessionId) @@ -730,7 +731,7 @@ func (t *tun) listen(link *link) { case "accept": s, exists := t.getSession(channel, sessionId) // just set accepted on anything not unicast - if exists && s.mode > Unicast { + if exists && s.mode > tunnel.Unicast { s.accepted = true continue } @@ -1166,10 +1167,10 @@ func (t *tun) Close() error { } // Dial an address -func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { +func (t *tun) Dial(channel string, opts ...tunnel.DialOption) (tunnel.Session, error) { // get the options - options := DialOptions{ - Timeout: DefaultDialTimeout, + options := tunnel.DialOptions{ + Timeout: tunnel.DefaultDialTimeout, Wait: true, } @@ -1239,9 +1240,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // delete session and return error t.delSession(c.channel, c.session) if logger.V(logger.DebugLevel, log) { - log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound) + log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, tunnel.ErrLinkNotFound) } - return nil, ErrLinkNotFound + return nil, tunnel.ErrLinkNotFound } // assume discovered because we picked @@ -1256,7 +1257,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } // discovered so set the link if not multicast - if c.discovered && c.mode == Unicast { + if c.discovered && c.mode == tunnel.Unicast { // pick a link if not specified if len(c.link) == 0 { // pickLink will pick the best link @@ -1300,7 +1301,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // return early if its not unicast // we will not wait for "open" for multicast // and we will not wait it told not to - if c.mode != Unicast || !options.Wait { + if c.mode != tunnel.Unicast || !options.Wait { return c, nil } @@ -1341,11 +1342,11 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { } // Accept a connection on the address -func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) { +func (t *tun) Listen(channel string, opts ...tunnel.ListenOption) (tunnel.Listener, error) { if logger.V(logger.DebugLevel, log) { log.Debugf("Tunnel listening on %s", channel) } - options := ListenOptions{ + options := tunnel.ListenOptions{ // Read timeout defaults to never Timeout: time.Duration(-1), } @@ -1405,9 +1406,9 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) { return tl, nil } -func (t *tun) Links() []Link { +func (t *tun) Links() []tunnel.Link { t.RLock() - links := make([]Link, 0, len(t.links)) + links := make([]tunnel.Link, 0, len(t.links)) for _, link := range t.links { links = append(links, link) diff --git a/tunnel/tunnel_test.go b/tunnel/mucp/mucp_test.go similarity index 87% rename from tunnel/tunnel_test.go rename to tunnel/mucp/mucp_test.go index aa6627aa..7be28ec6 100644 --- a/tunnel/tunnel_test.go +++ b/tunnel/mucp/mucp_test.go @@ -1,4 +1,4 @@ -package tunnel +package mucp import ( "os" @@ -7,9 +7,10 @@ import ( "time" "github.com/micro/go-micro/v3/transport" + "github.com/micro/go-micro/v3/tunnel" ) -func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { +func testBrokenTunAccept(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup) { defer wg.Done() // listen on some virtual address @@ -52,7 +53,7 @@ func testBrokenTunAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.Wait wait <- true } -func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) { +func testBrokenTunSend(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup, reconnect time.Duration) { defer wg.Done() // wait for the listener to get ready @@ -94,7 +95,7 @@ func testBrokenTunSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGr } // testAccept will accept connections on the transport, create a new link and tunnel on top -func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { +func testAccept(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup) { defer wg.Done() // listen on some virtual address @@ -135,7 +136,7 @@ func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { } // testSend will create a new link to an address and then a tunnel on top -func testSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { +func testSend(t *testing.T, tun tunnel.Tunnel, wait chan bool, wg *sync.WaitGroup) { defer wg.Done() // wait for the listener to get ready @@ -175,13 +176,13 @@ func testSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { func TestTunnel(t *testing.T) { // create a new tunnel client tunA := NewTunnel( - Address("127.0.0.1:9096"), - Nodes("127.0.0.1:9097"), + tunnel.Address("127.0.0.1:9096"), + tunnel.Nodes("127.0.0.1:9097"), ) // create a new tunnel server tunB := NewTunnel( - Address("127.0.0.1:9097"), + tunnel.Address("127.0.0.1:9097"), ) // start tunB @@ -217,8 +218,8 @@ func TestTunnel(t *testing.T) { func TestLoopbackTunnel(t *testing.T) { // create a new tunnel tun := NewTunnel( - Address("127.0.0.1:9096"), - Nodes("127.0.0.1:9096"), + tunnel.Address("127.0.0.1:9096"), + tunnel.Nodes("127.0.0.1:9096"), ) // start tunnel @@ -249,13 +250,13 @@ func TestLoopbackTunnel(t *testing.T) { func TestTunnelRTTRate(t *testing.T) { // create a new tunnel client tunA := NewTunnel( - Address("127.0.0.1:9096"), - Nodes("127.0.0.1:9097"), + tunnel.Address("127.0.0.1:9096"), + tunnel.Nodes("127.0.0.1:9097"), ) // create a new tunnel server tunB := NewTunnel( - Address("127.0.0.1:9097"), + tunnel.Address("127.0.0.1:9097"), ) // start tunB @@ -306,13 +307,13 @@ func TestReconnectTunnel(t *testing.T) { // create a new tunnel client tunA := NewTunnel( - Address("127.0.0.1:9098"), - Nodes("127.0.0.1:9099"), + tunnel.Address("127.0.0.1:9098"), + tunnel.Nodes("127.0.0.1:9099"), ) // create a new tunnel server tunB := NewTunnel( - Address("127.0.0.1:9099"), + tunnel.Address("127.0.0.1:9099"), ) // start tunnel diff --git a/tunnel/session.go b/tunnel/mucp/session.go similarity index 96% rename from tunnel/session.go rename to tunnel/mucp/session.go index 8ad0593a..e73d924b 100644 --- a/tunnel/session.go +++ b/tunnel/mucp/session.go @@ -1,4 +1,4 @@ -package tunnel +package mucp import ( "crypto/cipher" @@ -9,6 +9,7 @@ import ( "github.com/micro/go-micro/v3/logger" "github.com/micro/go-micro/v3/transport" + "github.com/micro/go-micro/v3/tunnel" ) // session is our pseudo session for transport.Socket @@ -40,7 +41,7 @@ type session struct { // lookback marks the session as a loopback on the inbound loopback bool // mode of the connection - mode Mode + mode tunnel.Mode // the dial timeout dialTimeout time.Duration // the read timeout @@ -71,7 +72,7 @@ type message struct { // loopback marks the message intended for loopback loopback bool // mode of the connection - mode Mode + mode tunnel.Mode // the link to send the message on link string // transport data @@ -180,7 +181,7 @@ func (s *session) waitFor(msgType string, timeout time.Duration) (*message, erro // got the message return msg, nil case <-after(timeout): - return nil, ErrReadTimeout + return nil, tunnel.ErrReadTimeout case <-s.closed: // check pending message queue select { @@ -214,14 +215,14 @@ func (s *session) Discover() error { // create a new discovery message for this channel msg := s.newMessage("discover") // broadcast the message to all links - msg.mode = Broadcast + msg.mode = tunnel.Broadcast // its an outbound connection since we're dialling msg.outbound = true // don't set the link since we don't know where it is msg.link = "" // if multicast then set that as session - if s.mode == Multicast { + if s.mode == tunnel.Multicast { msg.session = "multicast" } @@ -249,7 +250,7 @@ func (s *session) Discover() error { // wait to hear back about the sent message select { case <-time.After(after()): - return ErrDialTimeout + return tunnel.ErrDialTimeout case err := <-s.errChan: if err != nil { return err @@ -258,7 +259,7 @@ func (s *session) Discover() error { // bail early if its not unicast // we don't need to wait for the announce - if s.mode != Unicast { + if s.mode != tunnel.Unicast { s.discovered = true s.accepted = true return nil @@ -326,7 +327,7 @@ func (s *session) Announce() error { // we don't need an error back msg.errChan = nil // announce to all - msg.mode = Broadcast + msg.mode = tunnel.Broadcast // we don't need the link msg.link = "" @@ -382,7 +383,7 @@ func (s *session) Send(m *transport.Message) error { msg.data = data // if multicast don't set the link - if s.mode != Unicast { + if s.mode != tunnel.Unicast { msg.link = "" } @@ -482,7 +483,7 @@ func (s *session) Close() error { close(s.closed) // don't send close on multicast or broadcast - if s.mode != Unicast { + if s.mode != tunnel.Unicast { return nil } diff --git a/tunnel/transport/transport.go b/tunnel/transport/transport.go index 0cfc5d53..f96b2b30 100644 --- a/tunnel/transport/transport.go +++ b/tunnel/transport/transport.go @@ -6,6 +6,7 @@ import ( "github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/tunnel" + "github.com/micro/go-micro/v3/tunnel/mucp" ) type tunTransport struct { @@ -31,7 +32,7 @@ func (t *tunTransport) Init(opts ...transport.Option) error { // get the tunnel tun, ok := t.options.Context.Value(tunnelKey{}).(tunnel.Tunnel) if !ok { - tun = tunnel.NewTunnel() + tun = mucp.NewTunnel() } // get the transport diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 041f8344..f5f3751d 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -101,7 +101,3 @@ type Session interface { transport.Socket } -// NewTunnel creates a new tunnel -func NewTunnel(opts ...Option) Tunnel { - return newTunnel(opts...) -}