diff --git a/transport/quic/quic.go b/transport/quic/quic.go index 92854e3c..956c58f9 100644 --- a/transport/quic/quic.go +++ b/transport/quic/quic.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/gob" + "time" "github.com/lucas-clemente/quic-go" "github.com/micro/go-micro/transport" @@ -43,6 +44,9 @@ func (q *quicSocket) Recv(m *transport.Message) error { } func (q *quicSocket) Send(m *transport.Message) error { + // set the write deadline + q.st.SetWriteDeadline(time.Now().Add(time.Second * 10)) + // send the data return q.enc.Encode(m) } @@ -113,7 +117,10 @@ func (q *quicTransport) Dial(addr string, opts ...transport.DialOption) (transpo NextProtos: []string{"http/1.1"}, } } - s, err := quic.DialAddr(addr, config, &quic.Config{KeepAlive: true}) + s, err := quic.DialAddr(addr, config, &quic.Config{ + IdleTimeout: time.Minute * 2, + KeepAlive: true, + }) if err != nil { return nil, err } diff --git a/tunnel/default.go b/tunnel/default.go index 001c7156..b3ba8cc7 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -162,9 +162,7 @@ func (t *tun) announce(channel, session string, link *link) { // if no channel is present we've been asked to discover all channels if len(channel) == 0 { // get the list of channels - t.RLock() channels := t.listChannels() - t.RUnlock() // if there are no channels continue if len(channels) == 0 { @@ -220,9 +218,8 @@ func (t *tun) monitor() { log.Debugf("Tunnel failed to setup node link to %s: %v", node, err) continue } - // set the link id to the node - // TODO: hash it - link.id = node + // set the link id to the remote address + link.id = link.Remote() // save the link t.Lock() t.links[node] = link @@ -267,7 +264,7 @@ func (t *tun) process() { newMsg.Header["Micro-Tunnel-Token"] = t.token // send the message via the interface - t.Lock() + t.RLock() if len(t.links) == 0 { log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) @@ -275,7 +272,9 @@ func (t *tun) process() { var sent bool var err error + var sendTo []*link + // build the list of links ot send to for node, link := range t.links { // if the link is not connected skip it if !link.connected { @@ -318,16 +317,21 @@ func (t *tun) process() { } } + // add to link list + sendTo = append(sendTo, link) + } + + t.RUnlock() + + // send the message + for _, link := range sendTo { // send the message via the current link - log.Debugf("Sending %+v to %s", newMsg, node) + log.Debugf("Sending %+v to %s", newMsg, link.Remote()) if errr := link.Send(newMsg); errr != nil { - log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, node, errr) + log.Debugf("Tunnel error sending %+v to %s: %v", newMsg, link.Remote(), errr) err = errors.New(errr.Error()) - // kill the link - link.Close() - // delete the link - delete(t.links, node) + t.delLink(link.Remote()) continue } @@ -343,10 +347,9 @@ func (t *tun) process() { break } - t.Unlock() + var gerr error // set the error if not sent - var gerr error if !sent { gerr = err } @@ -367,17 +370,19 @@ func (t *tun) process() { } } -func (t *tun) delLink(id string) { +func (t *tun) delLink(remote string) { t.Lock() defer t.Unlock() + // get the link - link, ok := t.links[id] - if !ok { - return + for id, link := range t.links { + if link.id != remote { + continue + } + // close and delete + link.Close() + delete(t.links, id) } - // close and delete - link.Close() - delete(t.links, id) } // process incoming messages diff --git a/tunnel/link.go b/tunnel/link.go index a8423c73..d43b927c 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -35,10 +35,11 @@ type link struct { func newLink(s transport.Socket) *link { l := &link{ - Socket: s, - id: uuid.New().String(), - channels: make(map[string]time.Time), - closed: make(chan bool), + Socket: s, + id: uuid.New().String(), + channels: make(map[string]time.Time), + closed: make(chan bool), + lastKeepAlive: time.Now(), } go l.expiry() return l