From d0d729a7895bf86f352aca15c1327c83ef90ab65 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 11 Aug 2019 18:11:33 +0100 Subject: [PATCH] fix the tunnel execution --- server/rpc_request.go | 5 +++-- server/rpc_server.go | 1 + tunnel/default.go | 9 +++++++-- tunnel/listener.go | 8 +++++++- tunnel/socket.go | 17 ++++++++++++++++- 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/server/rpc_request.go b/server/rpc_request.go index 40995b14..532cf4b1 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -16,6 +16,7 @@ type rpcRequest struct { body []byte rawBody interface{} stream bool + first bool } type rpcMessage struct { @@ -54,9 +55,9 @@ func (r *rpcRequest) Body() interface{} { func (r *rpcRequest) Read() ([]byte, error) { // got a body - if r.body != nil { + if r.first { b := r.body - r.body = nil + r.first = false return b, nil } diff --git a/server/rpc_server.go b/server/rpc_server.go index b68e6425..90b00862 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -146,6 +146,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { body: msg.Body, socket: sock, stream: true, + first: true, } // internal response diff --git a/tunnel/default.go b/tunnel/default.go index 480fd465..f03e08ca 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -128,6 +128,9 @@ func (t *tun) process() { // send the message via the interface t.RLock() + if len(t.links) == 0 { + log.Debugf("Zero links to send to") + } for _, link := range t.links { log.Debugf("Sending %+v to %s", newMsg, link.Remote()) link.Send(newMsg) @@ -146,6 +149,7 @@ func (t *tun) listen(link transport.Socket, listener bool) { msg := new(transport.Message) err := link.Recv(msg) if err != nil { + log.Debugf("Tunnel link %s receive error: %v", link.Remote(), err) return } @@ -185,12 +189,12 @@ func (t *tun) listen(link transport.Socket, listener bool) { // no socket in existence if !exists { - log.Debugf("Skipping") + log.Debugf("Tunnel skipping no socket exists") // drop it, we don't care about // messages we don't know about continue } - log.Debugf("Using socket %s %s", s.id, s.session) + log.Debugf("Tunnel using socket %s %s", s.id, s.session) // is the socket closed? select { @@ -260,6 +264,7 @@ func (t *tun) connect() error { // delete the link defer func() { + log.Debugf("Deleting connection from %s", sock.Remote()) t.Lock() delete(t.links, id) t.Unlock() diff --git a/tunnel/listener.go b/tunnel/listener.go index 368cf4a5..d341d019 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -2,6 +2,8 @@ package tunnel import ( "io" + + "github.com/micro/go-micro/util/log" ) type tunListener struct { @@ -31,6 +33,7 @@ func (t *tunListener) process() { case m := <-t.socket.recv: // get a socket sock, ok := conns[m.session] + log.Debugf("Tunnel listener received id %s session %s exists: %t", m.id, m.session, ok) if !ok { // create a new socket session sock = &socket{ @@ -50,12 +53,14 @@ func (t *tunListener) process() { // save the socket conns[m.session] = sock + sock.recv <- m // send to accept chan select { case <-t.closed: return case t.accept <- sock: + continue } } @@ -64,6 +69,7 @@ func (t *tunListener) process() { case <-sock.closed: delete(conns, m.session) case sock.recv <- m: + log.Debugf("Tunnel listener sent to recv chan id %s session %s", m.id, m.session) } } } @@ -92,7 +98,7 @@ func (t *tunListener) Accept() (Conn, error) { return nil, io.EOF case <-t.tunClosed: // close the listener when the tunnel closes - close(t.closed) + t.Close() return nil, io.EOF // wait for a new connection case c, ok := <-t.accept: diff --git a/tunnel/socket.go b/tunnel/socket.go index ac738efc..8b288c40 100644 --- a/tunnel/socket.go +++ b/tunnel/socket.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/util/log" ) // socket is our pseudo socket for transport.Socket @@ -59,8 +60,21 @@ func (s *socket) Send(m *transport.Message) error { default: // no op } + + // make copy + data := &transport.Message{ + Header: make(map[string]string), + Body: m.Body, + } + + for k, v := range m.Header { + data.Header[k] = v + } + // append to backlog - s.send <- &message{id: s.id, session: s.session, data: m} + msg := &message{id: s.id, session: s.session, data: data} + log.Debugf("Appending %+v to send backlog", msg) + s.send <- msg return nil } @@ -73,6 +87,7 @@ func (s *socket) Recv(m *transport.Message) error { } // recv from backlog msg := <-s.recv + log.Debugf("Received %+v from recv backlog", msg) // set message *m = *msg.data // return nil