diff --git a/network/link/default.go b/network/link/default.go index f0d01ecd..f6742b83 100644 --- a/network/link/default.go +++ b/network/link/default.go @@ -4,6 +4,7 @@ package link import ( "io" "sync" + "time" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/transport" @@ -109,12 +110,49 @@ func (l *link) process() { } }() + // messages sent + i := 0 + length := 0 + for { select { case m := <-l.sendQueue: + t := time.Now() + + // send the message if err := l.send(m); err != nil { return } + + // get header size, body size and time taken + hl := len(m.Header) + bl := len(m.Body) + d := time.Since(t) + + // don't calculate on empty messages + if hl == 0 && bl == 0 { + continue + } + + // increment sent + i++ + + // time take to send some bits and bytes + td := float64(hl+bl) / float64(d.Nanoseconds()) + // increase the scale + td += 1 + + // judge the length + length = int(td) / (length + int(td)) + + // every 10 messages update length + if (i % 10) == 1 { + // cost average the length + // save it + l.Lock() + l.length = length + l.Unlock() + } case <-l.closed: return } @@ -158,7 +196,7 @@ func (l *link) Connect() error { // dial the endpoint c, err := l.transport.Dial(l.addr) if err != nil { - return nil + return err } // set the socket diff --git a/network/tunnel/default.go b/network/tunnel/default.go index 50bc1130..89e04dd3 100644 --- a/network/tunnel/default.go +++ b/network/tunnel/default.go @@ -65,6 +65,7 @@ func (t *tun) newSocket(id, session string) (*socket, bool) { closed: make(chan bool), recv: make(chan *message, 128), send: t.send, + wait: make(chan bool), } // save socket @@ -126,6 +127,16 @@ func (t *tun) listen() { return } + // first check Micro-Tunnel + switch msg.Header["Micro-Tunnel"] { + case "connect": + // assuming new connection + // TODO: do something with this + case "close": + // assuming connection closed + // TODO: do something with this + } + // the tunnel id id := msg.Header["Micro-Tunnel-Id"] @@ -136,7 +147,7 @@ func (t *tun) listen() { // TODO: check this is the case, is there any reason // why we'd have a blank session? Is the tunnel // used for some other purpose? - if len(session) == 0 { + if len(id) == 0 || len(session) == 0 { continue } @@ -200,6 +211,22 @@ func (t *tun) listen() { } } +func (t *tun) connect() error { + return t.link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "connect", + }, + }) +} + +func (t *tun) close() error { + return t.link.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Tunnel": "close", + }, + }) +} + // Close the tunnel func (t *tun) Close() error { t.Lock() @@ -220,6 +247,11 @@ func (t *tun) Close() error { // close the connection close(t.closed) t.connected = false + + // send a close message + // we don't close the link + // just the tunnel + return t.close() } return nil @@ -235,6 +267,11 @@ func (t *tun) Connect() error { return nil } + // send the connect message + if err := t.connect(); err != nil { + return err + } + // set as connected t.connected = true // create new close channel diff --git a/network/tunnel/listener.go b/network/tunnel/listener.go index 6a8d1ba3..dac315ce 100644 --- a/network/tunnel/listener.go +++ b/network/tunnel/listener.go @@ -42,6 +42,8 @@ func (t *tunListener) process() { recv: make(chan *message, 128), // use the internal send buffer send: t.socket.send, + // wait + wait: make(chan bool), } // save the socket