Move message to session

This commit is contained in:
Asim Aslam 2019-09-03 15:56:37 +01:00
parent 3ea4490d6c
commit 6c7582a6be
3 changed files with 23 additions and 22 deletions

View File

@ -91,7 +91,7 @@ func (t *tun) getSession(channel, session string) (*session, bool) {
func (t *tun) newSession(channel, sessionId string) (*session, bool) {
// new session
s := &session{
id: t.id,
tunnel: t.id,
channel: channel,
session: sessionId,
closed: make(chan bool),
@ -180,7 +180,7 @@ func (t *tun) process() {
newMsg.Header["Micro-Tunnel"] = msg.typ
// set the tunnel id on the outgoing message
newMsg.Header["Micro-Tunnel-Id"] = msg.id
newMsg.Header["Micro-Tunnel-Id"] = msg.tunnel
// set the tunnel channel on the outgoing message
newMsg.Header["Micro-Tunnel-Channel"] = msg.channel
@ -292,12 +292,19 @@ func (t *tun) listen(link *link) {
return
}
switch msg.Header["Micro-Tunnel"] {
// message type
mtype := msg.Header["Micro-Tunnel"]
// the tunnel id
id := msg.Header["Micro-Tunnel-Id"]
// the tunnel channel
channel := msg.Header["Micro-Tunnel-Channel"]
// the session id
sessionId := msg.Header["Micro-Tunnel-Session"]
switch mtype {
case "connect":
log.Debugf("Tunnel link %s received connect message", link.Remote())
id := msg.Header["Micro-Tunnel-Id"]
// are we connecting to ourselves?
if id == t.id {
link.loopback = true
@ -326,7 +333,7 @@ func (t *tun) listen(link *link) {
link.lastKeepAlive = time.Now()
t.Unlock()
continue
case "message":
case "session":
// process message
log.Debugf("Received %+v from %s", msg, link.Remote())
default:
@ -340,13 +347,6 @@ func (t *tun) listen(link *link) {
return
}
// the tunnel id
id := msg.Header["Micro-Tunnel-Id"]
// the tunnel channel
channel := msg.Header["Micro-Tunnel-Channel"]
// the session id
sessionId := msg.Header["Micro-Tunnel-Session"]
// strip tunnel message header
for k, _ := range msg.Header {
if strings.HasPrefix(k, "Micro-Tunnel") {
@ -423,7 +423,8 @@ func (t *tun) listen(link *link) {
// construct the internal message
imsg := &message{
id: id,
tunnel: id,
typ: mtype,
channel: channel,
session: sessionId,
data: tmsg,

View File

@ -31,12 +31,12 @@ func (t *tunListener) process() {
case m := <-t.session.recv:
// get a session
sess, ok := conns[m.session]
log.Debugf("Tunnel listener received id %s session %s exists: %t", m.id, m.session, ok)
log.Debugf("Tunnel listener received channel %s session %s exists: %t", m.channel, m.session, ok)
if !ok {
// create a new session session
sess = &session{
// the id of the remote side
id: m.id,
tunnel: m.tunnel,
// the channel
channel: m.channel,
// the session id
@ -73,7 +73,7 @@ func (t *tunListener) process() {
case <-sess.closed:
delete(conns, m.session)
case sess.recv <- m:
log.Debugf("Tunnel listener sent to recv chan id %s session %s", m.id, m.session)
log.Debugf("Tunnel listener sent to recv chan channel %s session %s", m.channel, m.session)
}
}
}

View File

@ -10,8 +10,8 @@ import (
// session is our pseudo session for transport.Socket
type session struct {
// unique id based on the remote tunnel id
id string
// the tunnel id
tunnel string
// the channel name
channel string
// the session id based on Micro.Tunnel-Session
@ -43,7 +43,7 @@ type message struct {
// type of message
typ string
// tunnel id
id string
tunnel string
// channel name
channel string
// the session id
@ -96,8 +96,8 @@ func (s *session) Send(m *transport.Message) error {
// append to backlog
msg := &message{
typ: "message",
id: s.id,
typ: "session",
tunnel: s.tunnel,
channel: s.channel,
session: s.session,
outbound: s.outbound,