functioning tunnel/link code
This commit is contained in:
		| @@ -4,6 +4,7 @@ package link | ||||
| import ( | ||||
| 	"io" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-micro/config/options" | ||||
| 	"github.com/micro/go-micro/transport" | ||||
| @@ -109,12 +110,49 @@ func (l *link) process() { | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	// messages sent | ||||
| 	i := 0 | ||||
| 	length := 0 | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case m := <-l.sendQueue: | ||||
| 			t := time.Now() | ||||
|  | ||||
| 			// send the message | ||||
| 			if err := l.send(m); err != nil { | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			// get header size, body size and time taken | ||||
| 			hl := len(m.Header) | ||||
| 			bl := len(m.Body) | ||||
| 			d := time.Since(t) | ||||
|  | ||||
| 			// don't calculate on empty messages | ||||
| 			if hl == 0 && bl == 0 { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// increment sent | ||||
| 			i++ | ||||
|  | ||||
| 			// time take to send some bits and bytes | ||||
| 			td := float64(hl+bl) / float64(d.Nanoseconds()) | ||||
| 			// increase the scale | ||||
| 			td += 1 | ||||
|  | ||||
| 			// judge the length | ||||
| 			length = int(td) / (length + int(td)) | ||||
|  | ||||
| 			// every 10 messages update length | ||||
| 			if (i % 10) == 1 { | ||||
| 				// cost average the length | ||||
| 				// save it | ||||
| 				l.Lock() | ||||
| 				l.length = length | ||||
| 				l.Unlock() | ||||
| 			} | ||||
| 		case <-l.closed: | ||||
| 			return | ||||
| 		} | ||||
| @@ -158,7 +196,7 @@ func (l *link) Connect() error { | ||||
| 	// dial the endpoint | ||||
| 	c, err := l.transport.Dial(l.addr) | ||||
| 	if err != nil { | ||||
| 		return nil | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// set the socket | ||||
|   | ||||
| @@ -65,6 +65,7 @@ func (t *tun) newSocket(id, session string) (*socket, bool) { | ||||
| 		closed:  make(chan bool), | ||||
| 		recv:    make(chan *message, 128), | ||||
| 		send:    t.send, | ||||
| 		wait:    make(chan bool), | ||||
| 	} | ||||
|  | ||||
| 	// save socket | ||||
| @@ -126,6 +127,16 @@ func (t *tun) listen() { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// first check Micro-Tunnel | ||||
| 		switch msg.Header["Micro-Tunnel"] { | ||||
| 		case "connect": | ||||
| 			// assuming new connection | ||||
| 			// TODO: do something with this | ||||
| 		case "close": | ||||
| 			// assuming connection closed | ||||
| 			// TODO: do something with this | ||||
| 		} | ||||
|  | ||||
| 		// the tunnel id | ||||
| 		id := msg.Header["Micro-Tunnel-Id"] | ||||
|  | ||||
| @@ -136,7 +147,7 @@ func (t *tun) listen() { | ||||
| 		// TODO: check this is the case, is there any reason | ||||
| 		// why we'd have a blank session? Is the tunnel | ||||
| 		// used for some other purpose? | ||||
| 		if len(session) == 0 { | ||||
| 		if len(id) == 0 || len(session) == 0 { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| @@ -200,6 +211,22 @@ func (t *tun) listen() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (t *tun) connect() error { | ||||
| 	return t.link.Send(&transport.Message{ | ||||
| 		Header: map[string]string{ | ||||
| 			"Micro-Tunnel": "connect", | ||||
| 		}, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (t *tun) close() error { | ||||
| 	return t.link.Send(&transport.Message{ | ||||
| 		Header: map[string]string{ | ||||
| 			"Micro-Tunnel": "close", | ||||
| 		}, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // Close the tunnel | ||||
| func (t *tun) Close() error { | ||||
| 	t.Lock() | ||||
| @@ -220,6 +247,11 @@ func (t *tun) Close() error { | ||||
| 		// close the connection | ||||
| 		close(t.closed) | ||||
| 		t.connected = false | ||||
|  | ||||
| 		// send a close message | ||||
| 		// we don't close the link | ||||
| 		// just the tunnel | ||||
| 		return t.close() | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| @@ -235,6 +267,11 @@ func (t *tun) Connect() error { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// send the connect message | ||||
| 	if err := t.connect(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// set as connected | ||||
| 	t.connected = true | ||||
| 	// create new close channel | ||||
|   | ||||
| @@ -42,6 +42,8 @@ func (t *tunListener) process() { | ||||
| 					recv: make(chan *message, 128), | ||||
| 					// use the internal send buffer | ||||
| 					send: t.socket.send, | ||||
| 					// wait | ||||
| 					wait: make(chan bool), | ||||
| 				} | ||||
|  | ||||
| 				// save the socket | ||||
|   | ||||
		Reference in New Issue
	
	Block a user