Tunnel mode

This commit is contained in:
Asim Aslam 2019-10-15 15:40:04 +01:00
parent 44b794722e
commit 7b1f5584ab
6 changed files with 67 additions and 37 deletions

View File

@ -71,7 +71,7 @@ func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.Publ
} }
func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(topic) l, err := t.tunnel.Listen(topic, tunnel.ListenMulticast())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -326,7 +326,7 @@ func (t *tun) process() {
} }
// check the multicast mappings // check the multicast mappings
if msg.multicast { if msg.mode > Unicast {
link.RLock() link.RLock()
_, ok := link.channels[msg.channel] _, ok := link.channels[msg.channel]
link.RUnlock() link.RUnlock()
@ -366,7 +366,7 @@ func (t *tun) process() {
sent = true sent = true
// keep sending broadcast messages // keep sending broadcast messages
if msg.broadcast || msg.multicast { if msg.mode > Unicast {
continue continue
} }
@ -523,7 +523,7 @@ func (t *tun) listen(link *link) {
case "accept": case "accept":
s, exists := t.getSession(channel, sessionId) s, exists := t.getSession(channel, sessionId)
// we don't need this // we don't need this
if exists && s.multicast { if exists && s.mode > Unicast {
s.accepted = true s.accepted = true
continue continue
} }
@ -963,7 +963,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
// set the multicast option // set the multicast option
c.multicast = options.Multicast c.mode = options.Mode
// set the dial timeout // set the dial timeout
c.timeout = options.Timeout c.timeout = options.Timeout
@ -1009,7 +1009,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// discovered so set the link if not multicast // discovered so set the link if not multicast
// TODO: pick the link efficiently based // TODO: pick the link efficiently based
// on link status and saturation. // on link status and saturation.
if c.discovered && !c.multicast { if c.discovered && c.mode == Unicast {
// set the link // set the link
i := rand.Intn(len(links)) i := rand.Intn(len(links))
c.link = links[i] c.link = links[i]
@ -1019,7 +1019,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
if !c.discovered { if !c.discovered {
// create a new discovery message for this channel // create a new discovery message for this channel
msg := c.newMessage("discover") msg := c.newMessage("discover")
msg.broadcast = true msg.mode = Broadcast
msg.outbound = true msg.outbound = true
msg.link = "" msg.link = ""
@ -1041,7 +1041,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
dialTimeout := after() dialTimeout := after()
// set a shorter delay for multicast // set a shorter delay for multicast
if c.multicast { if c.mode > Unicast {
// shorten this // shorten this
dialTimeout = time.Millisecond * 500 dialTimeout = time.Millisecond * 500
} }
@ -1057,7 +1057,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
// if its multicast just go ahead because this is best effort // if its multicast just go ahead because this is best effort
if c.multicast { if c.mode > Unicast {
c.discovered = true c.discovered = true
c.accepted = true c.accepted = true
return c, nil return c, nil
@ -1086,9 +1086,14 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
} }
// Accept a connection on the address // Accept a connection on the address
func (t *tun) Listen(channel string) (Listener, error) { func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
log.Debugf("Tunnel listening on %s", channel) log.Debugf("Tunnel listening on %s", channel)
var options ListenOptions
for _, o := range opts {
o(&options)
}
// create a new session by hashing the address // create a new session by hashing the address
c, ok := t.newSession(channel, "listener") c, ok := t.newSession(channel, "listener")
if !ok { if !ok {
@ -1103,6 +1108,8 @@ func (t *tun) Listen(channel string) (Listener, error) {
c.remote = "remote" c.remote = "remote"
// set local // set local
c.local = channel c.local = channel
// set mode
c.mode = options.Mode
tl := &tunListener{ tl := &tunListener{
channel: channel, channel: channel,

View File

@ -82,8 +82,8 @@ func (t *tunListener) process() {
loopback: m.loopback, loopback: m.loopback,
// the link the message was received on // the link the message was received on
link: m.link, link: m.link,
// set multicast // set the connection mode
multicast: m.multicast, mode: m.mode,
// close chan // close chan
closed: make(chan bool), closed: make(chan bool),
// recv called by the acceptor // recv called by the acceptor

View File

@ -36,12 +36,19 @@ type DialOption func(*DialOptions)
type DialOptions struct { type DialOptions struct {
// Link specifies the link to use // Link specifies the link to use
Link string Link string
// specify a multicast connection // specify mode of the session
Multicast bool Mode Mode
// the dial timeout // the dial timeout
Timeout time.Duration Timeout time.Duration
} }
type ListenOption func(*ListenOptions)
type ListenOptions struct {
// specify mode of the session
Mode Mode
}
// The tunnel id // The tunnel id
func Id(id string) Option { func Id(id string) Option {
return func(o *Options) { return func(o *Options) {
@ -87,12 +94,19 @@ func DefaultOptions() Options {
} }
} }
// Listen options
func ListenMulticast() ListenOption {
return func(o *ListenOptions) {
o.Mode = Multicast
}
}
// Dial options // Dial options
// Dial multicast sets the multicast option to send only to those mapped // Dial multicast sets the multicast option to send only to those mapped
func DialMulticast() DialOption { func DialMulticast() DialOption {
return func(o *DialOptions) { return func(o *DialOptions) {
o.Multicast = true o.Mode = Multicast
} }
} }

View File

@ -37,10 +37,8 @@ type session struct {
outbound bool outbound bool
// lookback marks the session as a loopback on the inbound // lookback marks the session as a loopback on the inbound
loopback bool loopback bool
// if the session is multicast // mode of the connection
multicast bool mode Mode
// if the session is broadcast
broadcast bool
// the timeout // the timeout
timeout time.Duration timeout time.Duration
// the link on which this message was received // the link on which this message was received
@ -63,10 +61,8 @@ type message struct {
outbound bool outbound bool
// loopback marks the message intended for loopback // loopback marks the message intended for loopback
loopback bool loopback bool
// whether to send as multicast // mode of the connection
multicast bool mode Mode
// broadcast sets the broadcast type
broadcast bool
// the link to send the message on // the link to send the message on
link string link string
// transport data // transport data
@ -104,7 +100,7 @@ func (s *session) newMessage(typ string) *message {
session: s.session, session: s.session,
outbound: s.outbound, outbound: s.outbound,
loopback: s.loopback, loopback: s.loopback,
multicast: s.multicast, mode: s.mode,
link: s.link, link: s.link,
errChan: s.errChan, errChan: s.errChan,
} }
@ -128,8 +124,8 @@ func (s *session) Open() error {
return io.EOF return io.EOF
} }
// we don't wait on multicast // don't wait on multicast/broadcast
if s.multicast { if s.mode > Unicast {
s.accepted = true s.accepted = true
return nil return nil
} }
@ -166,6 +162,11 @@ func (s *session) Accept() error {
// no op here // no op here
} }
// don't wait on multicast/broadcast
if s.mode > Unicast {
return nil
}
// wait for send response // wait for send response
select { select {
case err := <-s.errChan: case err := <-s.errChan:
@ -185,7 +186,7 @@ func (s *session) Announce() error {
// we don't need an error back // we don't need an error back
msg.errChan = nil msg.errChan = nil
// announce to all // announce to all
msg.broadcast = true msg.mode = Broadcast
// we don't need the link // we don't need the link
msg.link = "" msg.link = ""
@ -222,7 +223,7 @@ func (s *session) Send(m *transport.Message) error {
msg.data = data msg.data = data
// if multicast don't set the link // if multicast don't set the link
if s.multicast { if s.mode > Unicast {
msg.link = "" msg.link = ""
} }

View File

@ -8,6 +8,12 @@ import (
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
const (
Unicast Mode = iota
Multicast
Broadcast
)
var ( var (
// DefaultDialTimeout is the dial timeout if none is specified // DefaultDialTimeout is the dial timeout if none is specified
DefaultDialTimeout = time.Second * 5 DefaultDialTimeout = time.Second * 5
@ -19,6 +25,8 @@ var (
ErrLinkNotFound = errors.New("link not found") ErrLinkNotFound = errors.New("link not found")
) )
type Mode uint8
// Tunnel creates a gre tunnel on top of the go-micro/transport. // Tunnel creates a gre tunnel on top of the go-micro/transport.
// It establishes multiple streams using the Micro-Tunnel-Channel header // It establishes multiple streams using the Micro-Tunnel-Channel header
// and Micro-Tunnel-Session header. The tunnel id is a hash of // and Micro-Tunnel-Session header. The tunnel id is a hash of
@ -36,7 +44,7 @@ type Tunnel interface {
// Connect to a channel // Connect to a channel
Dial(channel string, opts ...DialOption) (Session, error) Dial(channel string, opts ...DialOption) (Session, error)
// Accept connections on a channel // Accept connections on a channel
Listen(channel string) (Listener, error) Listen(channel string, opts ...ListenOption) (Listener, error)
// Name of the tunnel implementation // Name of the tunnel implementation
String() string String() string
} }