diff --git a/network/default.go b/network/default.go index c3c8d5db..a4fbd0b5 100644 --- a/network/default.go +++ b/network/default.go @@ -6,6 +6,7 @@ import ( "hash/fnv" "io" "math" + "math/rand" "sort" "sync" "time" @@ -88,6 +89,7 @@ type message struct { // newNetwork returns a new network node func newNetwork(opts ...Option) Network { + rand.Seed(time.Now().UnixNano()) options := DefaultOptions() for _, o := range opts { @@ -178,12 +180,11 @@ func newNetwork(opts ...Option) Network { func (n *network) Init(opts ...Option) error { n.Lock() - defer n.Unlock() - // TODO: maybe only allow reinit of certain opts for _, o := range opts { o(&n.options) } + n.Unlock() return nil } @@ -191,10 +192,8 @@ func (n *network) Init(opts ...Option) error { // Options returns network options func (n *network) Options() Options { n.RLock() - defer n.RUnlock() - options := n.options - + n.RUnlock() return options } @@ -332,8 +331,7 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { // someone requested the route n.sendTo("advert", ControlChannel, peer, msg) default: - // send to all since we can't get anything - n.sendMsg("advert", ControlChannel, msg) + // no one to send to } case <-n.closed: return @@ -498,12 +496,12 @@ func (n *network) getHopCount(rtr string) int { } // the route origin is our peer - if _, ok := n.peers[rtr]; ok { + if _, ok := n.node.peers[rtr]; ok { return 10 } // the route origin is the peer of our peer - for _, peer := range n.peers { + for _, peer := range n.node.peers { for id := range peer.peers { if rtr == id { return 100 @@ -944,6 +942,13 @@ func (n *network) manage() { case <-n.closed: return case <-announce.C: + // jitter + j := rand.Int63n(30) + time.Sleep(time.Duration(j) * time.Second) + + // TODO: intermittently flip between peer selection + // and full broadcast pick a random set of peers + msg := PeersToProto(n.node, MaxDepth) // advertise yourself to the network if err := n.sendMsg("peer", NetworkChannel, msg); err != nil { diff --git a/tunnel/default.go b/tunnel/default.go index ff66ea6f..aadd4870 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -14,7 +14,7 @@ import ( var ( // DiscoverTime sets the time at which we fire discover messages - DiscoverTime = 60 * time.Second + DiscoverTime = 30 * time.Second // KeepAliveTime defines time interval we send keepalive messages to outbound links KeepAliveTime = 30 * time.Second // ReconnectTime defines time interval we periodically attempt to reconnect dead links @@ -42,6 +42,9 @@ type tun struct { // close channel closed chan bool + // control channel to indicate link change + updated chan bool + // a map of sessions based on Micro-Tunnel-Channel sessions map[string]*session @@ -54,6 +57,7 @@ type tun struct { // create new tunnel on top of a link func newTunnel(opts ...Option) *tun { + rand.Seed(time.Now().UnixNano()) options := DefaultOptions() for _, o := range opts { o(&options) @@ -65,6 +69,7 @@ func newTunnel(opts ...Option) *tun { token: options.Token, send: make(chan *message, 128), closed: make(chan bool), + updated: make(chan bool, 3), sessions: make(map[string]*session), links: make(map[string]*link), } @@ -222,6 +227,12 @@ func (t *tun) manageLink(link *link) { discover := time.NewTicker(DiscoverTime) defer discover.Stop() + wait := func(d time.Duration) { + // jitter + j := rand.Int63n(int64(d / 2)) + time.Sleep(time.Duration(j) * time.Second) + } + for { select { case <-t.closed: @@ -229,18 +240,29 @@ func (t *tun) manageLink(link *link) { case <-link.closed: return case <-discover.C: - // send a discovery message to all links - if err := t.sendMsg("discover", link); err != nil { - log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) - } + go func() { + // wait half the discover time + wait(DiscoverTime) + + // send a discovery message to the link + log.Debugf("Tunnel sending discover to link: %v", link.Remote()) + if err := t.sendMsg("discover", link); err != nil { + log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) + } + }() case <-keepalive.C: - // send keepalive message - log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) - if err := t.sendMsg("keepalive", link); err != nil { - log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) - t.delLink(link.Remote()) - return - } + go func() { + // wait half the keepalive time + wait(KeepAliveTime) + + // send keepalive message + log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) + if err := t.sendMsg("keepalive", link); err != nil { + log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) + t.delLink(link.Remote()) + return + } + }() } } } @@ -248,6 +270,7 @@ func (t *tun) manageLink(link *link) { // manageLinks is a function that can be called to immediately to link setup // it purges dead links while generating new links for any nodes not connected func (t *tun) manageLinks() { + // if we need to notify of updates delLinks := make(map[*link]string) connected := make(map[string]bool) @@ -304,6 +327,9 @@ func (t *tun) manageLinks() { } t.Unlock() + + // links were deleted so notify + go t.notify() } var wg sync.WaitGroup @@ -324,18 +350,22 @@ func (t *tun) manageLinks() { return } - // save the link t.Lock() - defer t.Unlock() // just check nothing else was setup in the interim if _, ok := t.links[node]; ok { link.Close() + t.Unlock() return } // save the link t.links[node] = link + + t.Unlock() + + // notify ourselves of the change + go t.notify() }(node) } @@ -345,48 +375,37 @@ func (t *tun) manageLinks() { // process outgoing messages sent by all local sessions func (t *tun) process() { + // wait for the first update + <-t.updated + + // get the list of links + t.RLock() + var links []*link + for _, link := range t.links { + links = append(links, link) + } + t.RUnlock() + // manage the send buffer // all pseudo sessions throw everything down this for { select { case msg := <-t.send: - newMsg := &transport.Message{ - Header: make(map[string]string), - } - - // set the data - if msg.data != nil { - for k, v := range msg.data.Header { - newMsg.Header[k] = v - } - newMsg.Body = msg.data.Body - } - - // set message head - newMsg.Header["Micro-Tunnel"] = msg.typ - - // set the tunnel id on the outgoing message - newMsg.Header["Micro-Tunnel-Id"] = msg.tunnel - - // set the tunnel channel on the outgoing message - newMsg.Header["Micro-Tunnel-Channel"] = msg.channel - - // set the session id - newMsg.Header["Micro-Tunnel-Session"] = msg.session - - // send the message via the interface - t.RLock() - - if len(t.links) == 0 { + // no links yet + if len(links) == 0 { + // TODO: should we block here rather than throwing away messages... + // Or should we return an error? log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) + time.Sleep(time.Millisecond * 100) + continue } - var sent bool - var err error + // build a list of links to send to var sendTo []*link + var err error // build the list of links ot send to - for node, link := range t.links { + for _, link := range links { // get the values we need link.RLock() id := link.id @@ -397,7 +416,7 @@ func (t *tun) process() { // if the link is not connected skip it if !connected { - log.Debugf("Link for node %s not connected", node) + log.Debugf("Link for node %s not connected", id) err = errors.New("link not connected") continue } @@ -406,7 +425,7 @@ func (t *tun) process() { // and the message is being sent outbound via // a dialled connection don't use this link if loopback && msg.outbound { - log.Tracef("Link for node %s is loopback", node) + log.Tracef("Link for node %s is loopback", id) err = errors.New("link is loopback") continue } @@ -414,7 +433,7 @@ func (t *tun) process() { // if the message was being returned by the loopback listener // send it back up the loopback link only if msg.loopback && !loopback { - log.Tracef("Link for message %s is loopback", node) + log.Tracef("Link for message %s is loopback", id) err = errors.New("link is not loopback") continue } @@ -439,55 +458,128 @@ func (t *tun) process() { sendTo = append(sendTo, link) } - t.RUnlock() - - // send the message - for _, link := range sendTo { - // send the message via the current link - log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote()) - - if errr := link.Send(newMsg); errr != nil { - log.Debugf("Tunnel error sending %+v to %s: %v", newMsg.Header, link.Remote(), errr) - err = errors.New(errr.Error()) - t.delLink(link.Remote()) - continue - } - - // is sent - sent = true - - // keep sending broadcast messages - if msg.mode > Unicast { - continue - } - - // break on unicast - break - } - - var gerr error - - // set the error if not sent - if !sent { - gerr = err - } - - // skip if its not been set - if msg.errChan == nil { + // no links to send to + if len(sendTo) == 0 { + t.respond(msg, err) continue } - // return error non blocking - select { - case msg.errChan <- gerr: - default: + // send the message + t.sendTo(sendTo, msg) + case <-t.updated: + t.RLock() + var newLinks []*link + for _, link := range t.links { + newLinks = append(links, link) } + t.RUnlock() + // links were updated + links = newLinks case <-t.closed: return } } } +// send response back for a message to the caller +func (t *tun) respond(msg *message, err error) { + select { + case msg.errChan <- err: + default: + } +} + +// sendTo sends a message to the chosen links +func (t *tun) sendTo(links []*link, msg *message) error { + // the function that sends the actual message + send := func(link *link, msg *transport.Message) error { + if err := link.Send(msg); err != nil { + log.Debugf("Tunnel error sending %+v to %s: %v", msg.Header, link.Remote(), err) + t.delLink(link.Remote()) + return err + } + return nil + } + + newMsg := &transport.Message{ + Header: make(map[string]string), + } + + // set the data + if msg.data != nil { + for k, v := range msg.data.Header { + newMsg.Header[k] = v + } + newMsg.Body = msg.data.Body + } + + // set message head + newMsg.Header["Micro-Tunnel"] = msg.typ + // set the tunnel id on the outgoing message + newMsg.Header["Micro-Tunnel-Id"] = msg.tunnel + // set the tunnel channel on the outgoing message + newMsg.Header["Micro-Tunnel-Channel"] = msg.channel + // set the session id + newMsg.Header["Micro-Tunnel-Session"] = msg.session + + // error channel for call + errChan := make(chan error, len(links)) + + // send the message + for _, link := range links { + // send the message via the current link + log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote()) + + // blast it in a go routine since its multicast/broadcast + if msg.mode > Unicast { + // make a copy + m := &transport.Message{ + Header: make(map[string]string), + } + copy(m.Body, newMsg.Body) + for k, v := range newMsg.Header { + m.Header[k] = v + } + + // execute in parallel + go func() { + errChan <- send(link, m) + }() + + continue + } + + // otherwise send as unicast + if err := send(link, newMsg); err != nil { + // put in the error chan if it failed + errChan <- err + continue + } + + // sent successfully so just return + t.respond(msg, nil) + return nil + } + + // either all unicast attempts failed or we're + // checking the multicast/broadcast attempts + + var err error + + // check all the errors + for i := 0; i < len(links); i++ { + err = <-errChan + // success + if err == nil { + break + } + } + + // return error. it's non blocking + t.respond(msg, err) + return err +} + func (t *tun) delLink(remote string) { t.Lock() @@ -502,9 +594,21 @@ func (t *tun) delLink(remote string) { delete(t.links, id) } + // let ourselves know of a link change + go t.notify() + t.Unlock() } +// notify ourselves of a link change +func (t *tun) notify() { + select { + case t.updated <- true: + // unblock after a second + case <-time.After(time.Second): + } +} + // process incoming messages func (t *tun) listen(link *link) { // remove the link on exit @@ -856,6 +960,9 @@ func (t *tun) setupLinks() { // wait for all threads to finish wg.Wait() + + // notify ourselves of the update + t.notify() } // connect the tunnel to all the nodes and listen for incoming tunnel connections @@ -1042,19 +1149,7 @@ func (t *tun) Close() error { // Dial an address func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { - log.Debugf("Tunnel dialing %s", channel) - c, ok := t.newSession(channel, t.newSessionId()) - if !ok { - return nil, errors.New("error dialing " + channel) - } - // set remote - c.remote = channel - // set local - c.local = "local" - // outbound session - c.outbound = true - - // get opts + // get the options options := DialOptions{ Timeout: DefaultDialTimeout, Wait: true, @@ -1064,12 +1159,28 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { o(&options) } - // set the multicast option + log.Debugf("Tunnel dialing %s", channel) + + // create a new session + c, ok := t.newSession(channel, t.newSessionId()) + if !ok { + return nil, errors.New("error dialing " + channel) + } + + // set remote + c.remote = channel + // set local + c.local = "local" + // outbound session + c.outbound = true + // set the mode of connection unicast/multicast/broadcast c.mode = options.Mode // set the dial timeout c.dialTimeout = options.Timeout // set read timeout set to never c.readTimeout = time.Duration(-1) + // set the link + c.link = options.Link var links []*link // did we measure the rtt @@ -1080,7 +1191,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // non multicast so we need to find the link for _, link := range t.links { // use the link specified it its available - if id := options.Link; len(id) > 0 && link.id != id { + if len(c.link) > 0 && link.id != c.link { continue } @@ -1096,20 +1207,36 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { t.RUnlock() - // link not found and one was specified so error out - if len(links) == 0 && len(options.Link) > 0 { - // delete session and return error - t.delSession(c.channel, c.session) - log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound) - return nil, ErrLinkNotFound + // link option was specified to pick the link + if len(options.Link) > 0 { + // link not found and one was specified so error out + if len(links) == 0 { + // delete session and return error + t.delSession(c.channel, c.session) + log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound) + return nil, ErrLinkNotFound + } + + // assume discovered because we picked + c.discovered = true + + // link asked for and found and now + // we've been asked not to wait so return + if !options.Wait { + c.accepted = true + return c, nil + } } // discovered so set the link if not multicast if c.discovered && c.mode == Unicast { - // pickLink will pick the best link - link := t.pickLink(links) - // set the link - c.link = link.id + // pick a link if not specified + if len(c.link) == 0 { + // pickLink will pick the best link + link := t.pickLink(links) + // set the link + c.link = link.id + } } // if its not already discovered we need to attempt to do so @@ -1143,12 +1270,8 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { // return early if its not unicast // we will not wait for "open" for multicast - if c.mode != Unicast { - return c, nil - } - - // if we're not told to wait - if !options.Wait { + // and we will not wait it told not to + if c.mode != Unicast || !options.Wait { return c, nil } @@ -1241,9 +1364,6 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) { // to the existign sessions go tl.process() - // announces the listener channel to others - go tl.announce() - // return the listener return tl, nil } diff --git a/tunnel/listener.go b/tunnel/listener.go index 6dbec5c8..16f37e35 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -2,7 +2,6 @@ package tunnel import ( "io" - "time" "github.com/micro/go-micro/util/log" ) @@ -24,24 +23,6 @@ type tunListener struct { delFunc func() } -// periodically announce self the channel being listened on -func (t *tunListener) announce() { - tick := time.NewTicker(time.Second * 30) - defer tick.Stop() - - // first announcement - t.session.Announce() - - for { - select { - case <-tick.C: - t.session.Announce() - case <-t.closed: - return - } - } -} - func (t *tunListener) process() { // our connection map for session conns := make(map[string]*session)