Merge pull request #755 from micro/tunnel
Add tunnel fixes for quic and keepalive
This commit is contained in:
		| @@ -5,6 +5,7 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
| 	"encoding/gob" | 	"encoding/gob" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/lucas-clemente/quic-go" | 	"github.com/lucas-clemente/quic-go" | ||||||
| 	"github.com/micro/go-micro/transport" | 	"github.com/micro/go-micro/transport" | ||||||
| @@ -43,6 +44,9 @@ func (q *quicSocket) Recv(m *transport.Message) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (q *quicSocket) Send(m *transport.Message) error { | func (q *quicSocket) Send(m *transport.Message) error { | ||||||
|  | 	// set the write deadline | ||||||
|  | 	q.st.SetWriteDeadline(time.Now().Add(time.Second * 10)) | ||||||
|  | 	// send the data | ||||||
| 	return q.enc.Encode(m) | 	return q.enc.Encode(m) | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -113,7 +117,10 @@ func (q *quicTransport) Dial(addr string, opts ...transport.DialOption) (transpo | |||||||
| 			NextProtos:         []string{"http/1.1"}, | 			NextProtos:         []string{"http/1.1"}, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	s, err := quic.DialAddr(addr, config, &quic.Config{KeepAlive: true}) | 	s, err := quic.DialAddr(addr, config, &quic.Config{ | ||||||
|  | 		IdleTimeout: time.Minute * 2, | ||||||
|  | 		KeepAlive:   true, | ||||||
|  | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -162,9 +162,7 @@ func (t *tun) announce(channel, session string, link *link) { | |||||||
| 	// if no channel is present we've been asked to discover all channels | 	// if no channel is present we've been asked to discover all channels | ||||||
| 	if len(channel) == 0 { | 	if len(channel) == 0 { | ||||||
| 		// get the list of channels | 		// get the list of channels | ||||||
| 		t.RLock() |  | ||||||
| 		channels := t.listChannels() | 		channels := t.listChannels() | ||||||
| 		t.RUnlock() |  | ||||||
|  |  | ||||||
| 		// if there are no channels continue | 		// if there are no channels continue | ||||||
| 		if len(channels) == 0 { | 		if len(channels) == 0 { | ||||||
| @@ -220,9 +218,8 @@ func (t *tun) monitor() { | |||||||
| 					log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) | 					log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| 				// set the link id to the node | 				// set the link id to the remote address | ||||||
| 				// TODO: hash it | 				link.id = link.Remote() | ||||||
| 				link.id = node |  | ||||||
| 				// save the link | 				// save the link | ||||||
| 				t.Lock() | 				t.Lock() | ||||||
| 				t.links[node] = link | 				t.links[node] = link | ||||||
| @@ -267,7 +264,7 @@ func (t *tun) process() { | |||||||
| 			newMsg.Header["Micro-Tunnel-Token"] = t.token | 			newMsg.Header["Micro-Tunnel-Token"] = t.token | ||||||
|  |  | ||||||
| 			// send the message via the interface | 			// send the message via the interface | ||||||
| 			t.Lock() | 			t.RLock() | ||||||
|  |  | ||||||
| 			if len(t.links) == 0 { | 			if len(t.links) == 0 { | ||||||
| 				log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) | 				log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) | ||||||
| @@ -275,7 +272,9 @@ func (t *tun) process() { | |||||||
|  |  | ||||||
| 			var sent bool | 			var sent bool | ||||||
| 			var err error | 			var err error | ||||||
|  | 			var sendTo []*link | ||||||
|  |  | ||||||
|  | 			// build the list of links ot send to | ||||||
| 			for node, link := range t.links { | 			for node, link := range t.links { | ||||||
| 				// if the link is not connected skip it | 				// if the link is not connected skip it | ||||||
| 				if !link.connected { | 				if !link.connected { | ||||||
| @@ -318,16 +317,21 @@ func (t *tun) process() { | |||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
|  | 				// add to link list | ||||||
|  | 				sendTo = append(sendTo, link) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			t.RUnlock() | ||||||
|  |  | ||||||
|  | 			// send the message | ||||||
|  | 			for _, link := range sendTo { | ||||||
| 				// send the message via the current link | 				// send the message via the current link | ||||||
| 				log.Debugf("Sending %+v to %s", newMsg, node) | 				log.Debugf("Sending %+v to %s", newMsg, link.Remote()) | ||||||
|  |  | ||||||
| 				if errr := link.Send(newMsg); errr != nil { | 				if errr := link.Send(newMsg); errr != nil { | ||||||
| 					log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, errr) | 					log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, link.Remote(), errr) | ||||||
| 					err = errors.New(errr.Error()) | 					err = errors.New(errr.Error()) | ||||||
| 					// kill the link | 					t.delLink(link.Remote()) | ||||||
| 					link.Close() |  | ||||||
| 					// delete the link |  | ||||||
| 					delete(t.links, node) |  | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| @@ -343,10 +347,9 @@ func (t *tun) process() { | |||||||
| 				break | 				break | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			t.Unlock() | 			var gerr error | ||||||
|  |  | ||||||
| 			// set the error if not sent | 			// set the error if not sent | ||||||
| 			var gerr error |  | ||||||
| 			if !sent { | 			if !sent { | ||||||
| 				gerr = err | 				gerr = err | ||||||
| 			} | 			} | ||||||
| @@ -367,17 +370,19 @@ func (t *tun) process() { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t *tun) delLink(id string) { | func (t *tun) delLink(remote string) { | ||||||
| 	t.Lock() | 	t.Lock() | ||||||
| 	defer t.Unlock() | 	defer t.Unlock() | ||||||
|  |  | ||||||
| 	// get the link | 	// get the link | ||||||
| 	link, ok := t.links[id] | 	for id, link := range t.links { | ||||||
| 	if !ok { | 		if link.id != remote { | ||||||
| 		return | 			continue | ||||||
|  | 		} | ||||||
|  | 		// close and delete | ||||||
|  | 		link.Close() | ||||||
|  | 		delete(t.links, id) | ||||||
| 	} | 	} | ||||||
| 	// close and delete |  | ||||||
| 	link.Close() |  | ||||||
| 	delete(t.links, id) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // process incoming messages | // process incoming messages | ||||||
|   | |||||||
| @@ -35,10 +35,11 @@ type link struct { | |||||||
|  |  | ||||||
| func newLink(s transport.Socket) *link { | func newLink(s transport.Socket) *link { | ||||||
| 	l := &link{ | 	l := &link{ | ||||||
| 		Socket:   s, | 		Socket:        s, | ||||||
| 		id:       uuid.New().String(), | 		id:            uuid.New().String(), | ||||||
| 		channels: make(map[string]time.Time), | 		channels:      make(map[string]time.Time), | ||||||
| 		closed:   make(chan bool), | 		closed:        make(chan bool), | ||||||
|  | 		lastKeepAlive: time.Now(), | ||||||
| 	} | 	} | ||||||
| 	go l.expiry() | 	go l.expiry() | ||||||
| 	return l | 	return l | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user