diff --git a/tunnel/default.go b/tunnel/default.go index f1a0eaac..0ab11ea2 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -830,15 +830,13 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // shit fuck if !c.discovered { - t.send <- &message{ - typ: "discover", - tunnel: t.id, - channel: channel, - session: c.session, - broadcast: true, - outbound: true, - errChan: c.errChan, - } + msg := c.newMessage("discover") + msg.broadcast = true + msg.outbound = true + msg.link = "" + + // send the discovery message + t.send <- msg select { case err := <-c.errChan: diff --git a/tunnel/session.go b/tunnel/session.go index 9e431296..eb3cc170 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -91,10 +91,9 @@ func (s *session) Channel() string { return s.channel } -// Open will fire the open message for the session. This is called by the dialler. -func (s *session) Open() error { - msg := &message{ - typ: "open", +func (s *session) newMessage(typ string) *message { + return &message{ + typ: typ, tunnel: s.tunnel, channel: s.channel, session: s.session, @@ -104,6 +103,12 @@ func (s *session) Open() error { link: s.link, errChan: s.errChan, } +} + +// Open will fire the open message for the session. This is called by the dialler. +func (s *session) Open() error { + // create a new message + msg := s.newMessage("open") // send open message s.send <- msg @@ -146,17 +151,7 @@ func (s *session) Open() error { // Accept sends the accept response to an open message from a dialled connection func (s *session) Accept() error { - msg := &message{ - typ: "accept", - tunnel: s.tunnel, - channel: s.channel, - session: s.session, - outbound: s.outbound, - loopback: s.loopback, - multicast: s.multicast, - link: s.link, - errChan: s.errChan, - } + msg := s.newMessage("accept") // send the accept message select { @@ -181,15 +176,11 @@ func (s *session) Accept() error { // Announce sends an announcement to notify that this session exists. This is primarily used by the listener. func (s *session) Announce() error { - msg := &message{ - typ: "announce", - tunnel: s.tunnel, - channel: s.channel, - session: s.session, - outbound: s.outbound, - loopback: s.loopback, - multicast: s.multicast, - } + msg := s.newMessage("announce") + // we don't need an error back + msg.errChan = nil + // we don't need the link + msg.link = "" select { case s.send <- msg: @@ -218,26 +209,14 @@ func (s *session) Send(m *transport.Message) error { data.Header[k] = v } - // append to backlog - msg := &message{ - typ: "session", - tunnel: s.tunnel, - channel: s.channel, - session: s.session, - outbound: s.outbound, - loopback: s.loopback, - multicast: s.multicast, - data: data, - // specify the link on which to send this - // it will be blank for dialled sessions - link: s.link, - // error chan - errChan: s.errChan, - } + // create a new message + msg := s.newMessage("session") + // set the data + msg.data = data - // if not multicast then set link - if !s.multicast { - msg.link = s.link + // if multicast don't set the link + if s.multicast { + msg.link = "" } log.Debugf("Appending %+v to send backlog", msg) @@ -289,16 +268,9 @@ func (s *session) Close() error { close(s.closed) // append to backlog - msg := &message{ - typ: "close", - tunnel: s.tunnel, - channel: s.channel, - session: s.session, - outbound: s.outbound, - loopback: s.loopback, - multicast: s.multicast, - link: s.link, - } + msg := s.newMessage("close") + // no error response on close + msg.errChan = nil // send the close message select { diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 9566f3b9..e76f8437 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -15,7 +15,6 @@ var ( DefaultDialTimeout = time.Second * 5 ) - // Tunnel creates a gre tunnel on top of the go-micro/transport. // It establishes multiple streams using the Micro-Tunnel-Channel header // and Micro-Tunnel-Session header. The tunnel id is a hash of