Add tunnel fixes for quic and keepalive

This commit is contained in:
Asim Aslam 2019-09-12 16:22:43 -07:00
parent ec6a30be37
commit 97cf478f71
3 changed files with 39 additions and 26 deletions

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/gob" "encoding/gob"
"time"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
@ -43,6 +44,9 @@ func (q *quicSocket) Recv(m *transport.Message) error {
} }
func (q *quicSocket) Send(m *transport.Message) error { func (q *quicSocket) Send(m *transport.Message) error {
// set the write deadline
q.st.SetWriteDeadline(time.Now().Add(time.Second * 10))
// send the data
return q.enc.Encode(m) return q.enc.Encode(m)
} }
@ -113,7 +117,10 @@ func (q *quicTransport) Dial(addr string, opts ...transport.DialOption) (transpo
NextProtos: []string{"http/1.1"}, NextProtos: []string{"http/1.1"},
} }
} }
s, err := quic.DialAddr(addr, config, &quic.Config{KeepAlive: true}) s, err := quic.DialAddr(addr, config, &quic.Config{
IdleTimeout: time.Minute * 2,
KeepAlive: true,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -162,9 +162,7 @@ func (t *tun) announce(channel, session string, link *link) {
// if no channel is present we've been asked to discover all channels // if no channel is present we've been asked to discover all channels
if len(channel) == 0 { if len(channel) == 0 {
// get the list of channels // get the list of channels
t.RLock()
channels := t.listChannels() channels := t.listChannels()
t.RUnlock()
// if there are no channels continue // if there are no channels continue
if len(channels) == 0 { if len(channels) == 0 {
@ -220,9 +218,8 @@ func (t *tun) monitor() {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
continue continue
} }
// set the link id to the node // set the link id to the remote address
// TODO: hash it link.id = link.Remote()
link.id = node
// save the link // save the link
t.Lock() t.Lock()
t.links[node] = link t.links[node] = link
@ -267,7 +264,7 @@ func (t *tun) process() {
newMsg.Header["Micro-Tunnel-Token"] = t.token newMsg.Header["Micro-Tunnel-Token"] = t.token
// send the message via the interface // send the message via the interface
t.Lock() t.RLock()
if len(t.links) == 0 { if len(t.links) == 0 {
log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel)
@ -275,7 +272,9 @@ func (t *tun) process() {
var sent bool var sent bool
var err error var err error
var sendTo []*link
// build the list of links ot send to
for node, link := range t.links { for node, link := range t.links {
// if the link is not connected skip it // if the link is not connected skip it
if !link.connected { if !link.connected {
@ -318,16 +317,21 @@ func (t *tun) process() {
} }
} }
// add to link list
sendTo = append(sendTo, link)
}
t.RUnlock()
// send the message
for _, link := range sendTo {
// send the message via the current link // send the message via the current link
log.Debugf("Sending %+v to %s", newMsg, node) log.Debugf("Sending %+v to %s", newMsg, link.Remote())
if errr := link.Send(newMsg); errr != nil { if errr := link.Send(newMsg); errr != nil {
log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, errr) log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, link.Remote(), errr)
err = errors.New(errr.Error()) err = errors.New(errr.Error())
// kill the link t.delLink(link.Remote())
link.Close()
// delete the link
delete(t.links, node)
continue continue
} }
@ -343,10 +347,9 @@ func (t *tun) process() {
break break
} }
t.Unlock() var gerr error
// set the error if not sent // set the error if not sent
var gerr error
if !sent { if !sent {
gerr = err gerr = err
} }
@ -367,18 +370,20 @@ func (t *tun) process() {
} }
} }
func (t *tun) delLink(id string) { func (t *tun) delLink(remote string) {
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
// get the link // get the link
link, ok := t.links[id] for id, link := range t.links {
if !ok { if link.id != remote {
return continue
} }
// close and delete // close and delete
link.Close() link.Close()
delete(t.links, id) delete(t.links, id)
} }
}
// process incoming messages // process incoming messages
func (t *tun) listen(link *link) { func (t *tun) listen(link *link) {

View File

@ -39,6 +39,7 @@ func newLink(s transport.Socket) *link {
id: uuid.New().String(), id: uuid.New().String(),
channels: make(map[string]time.Time), channels: make(map[string]time.Time),
closed: make(chan bool), closed: make(chan bool),
lastKeepAlive: time.Now(),
} }
go l.expiry() go l.expiry()
return l return l