save cruft

This commit is contained in:
Asim Aslam 2019-12-12 12:27:46 +00:00
parent 27af221fd2
commit e260cc4a24
3 changed files with 255 additions and 149 deletions

View File

@ -6,6 +6,7 @@ import (
"hash/fnv" "hash/fnv"
"io" "io"
"math" "math"
"math/rand"
"sort" "sort"
"sync" "sync"
"time" "time"
@ -88,6 +89,7 @@ type message struct {
// newNetwork returns a new network node // newNetwork returns a new network node
func newNetwork(opts ...Option) Network { func newNetwork(opts ...Option) Network {
rand.Seed(time.Now().UnixNano())
options := DefaultOptions() options := DefaultOptions()
for _, o := range opts { for _, o := range opts {
@ -178,12 +180,11 @@ func newNetwork(opts ...Option) Network {
func (n *network) Init(opts ...Option) error { func (n *network) Init(opts ...Option) error {
n.Lock() n.Lock()
defer n.Unlock()
// TODO: maybe only allow reinit of certain opts // TODO: maybe only allow reinit of certain opts
for _, o := range opts { for _, o := range opts {
o(&n.options) o(&n.options)
} }
n.Unlock()
return nil return nil
} }
@ -191,10 +192,8 @@ func (n *network) Init(opts ...Option) error {
// Options returns network options // Options returns network options
func (n *network) Options() Options { func (n *network) Options() Options {
n.RLock() n.RLock()
defer n.RUnlock()
options := n.options options := n.options
n.RUnlock()
return options return options
} }
@ -332,8 +331,7 @@ func (n *network) advertise(advertChan <-chan *router.Advert) {
// someone requested the route // someone requested the route
n.sendTo("advert", ControlChannel, peer, msg) n.sendTo("advert", ControlChannel, peer, msg)
default: default:
// send to all since we can't get anything // no one to send to
n.sendMsg("advert", ControlChannel, msg)
} }
case <-n.closed: case <-n.closed:
return return
@ -498,12 +496,12 @@ func (n *network) getHopCount(rtr string) int {
} }
// the route origin is our peer // the route origin is our peer
if _, ok := n.peers[rtr]; ok { if _, ok := n.node.peers[rtr]; ok {
return 10 return 10
} }
// the route origin is the peer of our peer // the route origin is the peer of our peer
for _, peer := range n.peers { for _, peer := range n.node.peers {
for id := range peer.peers { for id := range peer.peers {
if rtr == id { if rtr == id {
return 100 return 100
@ -944,6 +942,13 @@ func (n *network) manage() {
case <-n.closed: case <-n.closed:
return return
case <-announce.C: case <-announce.C:
// jitter
j := rand.Int63n(30)
time.Sleep(time.Duration(j) * time.Second)
// TODO: intermittently flip between peer selection
// and full broadcast pick a random set of peers
msg := PeersToProto(n.node, MaxDepth) msg := PeersToProto(n.node, MaxDepth)
// advertise yourself to the network // advertise yourself to the network
if err := n.sendMsg("peer", NetworkChannel, msg); err != nil { if err := n.sendMsg("peer", NetworkChannel, msg); err != nil {

View File

@ -14,7 +14,7 @@ import (
var ( var (
// DiscoverTime sets the time at which we fire discover messages // DiscoverTime sets the time at which we fire discover messages
DiscoverTime = 60 * time.Second DiscoverTime = 30 * time.Second
// KeepAliveTime defines time interval we send keepalive messages to outbound links // KeepAliveTime defines time interval we send keepalive messages to outbound links
KeepAliveTime = 30 * time.Second KeepAliveTime = 30 * time.Second
// ReconnectTime defines time interval we periodically attempt to reconnect dead links // ReconnectTime defines time interval we periodically attempt to reconnect dead links
@ -42,6 +42,9 @@ type tun struct {
// close channel // close channel
closed chan bool closed chan bool
// control channel to indicate link change
updated chan bool
// a map of sessions based on Micro-Tunnel-Channel // a map of sessions based on Micro-Tunnel-Channel
sessions map[string]*session sessions map[string]*session
@ -54,6 +57,7 @@ type tun struct {
// create new tunnel on top of a link // create new tunnel on top of a link
func newTunnel(opts ...Option) *tun { func newTunnel(opts ...Option) *tun {
rand.Seed(time.Now().UnixNano())
options := DefaultOptions() options := DefaultOptions()
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
@ -65,6 +69,7 @@ func newTunnel(opts ...Option) *tun {
token: options.Token, token: options.Token,
send: make(chan *message, 128), send: make(chan *message, 128),
closed: make(chan bool), closed: make(chan bool),
updated: make(chan bool, 3),
sessions: make(map[string]*session), sessions: make(map[string]*session),
links: make(map[string]*link), links: make(map[string]*link),
} }
@ -222,6 +227,12 @@ func (t *tun) manageLink(link *link) {
discover := time.NewTicker(DiscoverTime) discover := time.NewTicker(DiscoverTime)
defer discover.Stop() defer discover.Stop()
wait := func(d time.Duration) {
// jitter
j := rand.Int63n(int64(d / 2))
time.Sleep(time.Duration(j) * time.Second)
}
for { for {
select { select {
case <-t.closed: case <-t.closed:
@ -229,18 +240,29 @@ func (t *tun) manageLink(link *link) {
case <-link.closed: case <-link.closed:
return return
case <-discover.C: case <-discover.C:
// send a discovery message to all links go func() {
if err := t.sendMsg("discover", link); err != nil { // wait half the discover time
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err) wait(DiscoverTime)
}
// send a discovery message to the link
log.Debugf("Tunnel sending discover to link: %v", link.Remote())
if err := t.sendMsg("discover", link); err != nil {
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
}
}()
case <-keepalive.C: case <-keepalive.C:
// send keepalive message go func() {
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote()) // wait half the keepalive time
if err := t.sendMsg("keepalive", link); err != nil { wait(KeepAliveTime)
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
t.delLink(link.Remote()) // send keepalive message
return log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
} if err := t.sendMsg("keepalive", link); err != nil {
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
t.delLink(link.Remote())
return
}
}()
} }
} }
} }
@ -248,6 +270,7 @@ func (t *tun) manageLink(link *link) {
// manageLinks is a function that can be called to immediately to link setup // manageLinks is a function that can be called to immediately to link setup
// it purges dead links while generating new links for any nodes not connected // it purges dead links while generating new links for any nodes not connected
func (t *tun) manageLinks() { func (t *tun) manageLinks() {
// if we need to notify of updates
delLinks := make(map[*link]string) delLinks := make(map[*link]string)
connected := make(map[string]bool) connected := make(map[string]bool)
@ -304,6 +327,9 @@ func (t *tun) manageLinks() {
} }
t.Unlock() t.Unlock()
// links were deleted so notify
go t.notify()
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -324,18 +350,22 @@ func (t *tun) manageLinks() {
return return
} }
// save the link
t.Lock() t.Lock()
defer t.Unlock()
// just check nothing else was setup in the interim // just check nothing else was setup in the interim
if _, ok := t.links[node]; ok { if _, ok := t.links[node]; ok {
link.Close() link.Close()
t.Unlock()
return return
} }
// save the link // save the link
t.links[node] = link t.links[node] = link
t.Unlock()
// notify ourselves of the change
go t.notify()
}(node) }(node)
} }
@ -345,48 +375,37 @@ func (t *tun) manageLinks() {
// process outgoing messages sent by all local sessions // process outgoing messages sent by all local sessions
func (t *tun) process() { func (t *tun) process() {
// wait for the first update
<-t.updated
// get the list of links
t.RLock()
var links []*link
for _, link := range t.links {
links = append(links, link)
}
t.RUnlock()
// manage the send buffer // manage the send buffer
// all pseudo sessions throw everything down this // all pseudo sessions throw everything down this
for { for {
select { select {
case msg := <-t.send: case msg := <-t.send:
newMsg := &transport.Message{ // no links yet
Header: make(map[string]string), if len(links) == 0 {
} // TODO: should we block here rather than throwing away messages...
// Or should we return an error?
// set the data
if msg.data != nil {
for k, v := range msg.data.Header {
newMsg.Header[k] = v
}
newMsg.Body = msg.data.Body
}
// set message head
newMsg.Header["Micro-Tunnel"] = msg.typ
// set the tunnel id on the outgoing message
newMsg.Header["Micro-Tunnel-Id"] = msg.tunnel
// set the tunnel channel on the outgoing message
newMsg.Header["Micro-Tunnel-Channel"] = msg.channel
// set the session id
newMsg.Header["Micro-Tunnel-Session"] = msg.session
// send the message via the interface
t.RLock()
if len(t.links) == 0 {
log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel) log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel)
time.Sleep(time.Millisecond * 100)
continue
} }
var sent bool // build a list of links to send to
var err error
var sendTo []*link var sendTo []*link
var err error
// build the list of links ot send to // build the list of links ot send to
for node, link := range t.links { for _, link := range links {
// get the values we need // get the values we need
link.RLock() link.RLock()
id := link.id id := link.id
@ -397,7 +416,7 @@ func (t *tun) process() {
// if the link is not connected skip it // if the link is not connected skip it
if !connected { if !connected {
log.Debugf("Link for node %s not connected", node) log.Debugf("Link for node %s not connected", id)
err = errors.New("link not connected") err = errors.New("link not connected")
continue continue
} }
@ -406,7 +425,7 @@ func (t *tun) process() {
// and the message is being sent outbound via // and the message is being sent outbound via
// a dialled connection don't use this link // a dialled connection don't use this link
if loopback && msg.outbound { if loopback && msg.outbound {
log.Tracef("Link for node %s is loopback", node) log.Tracef("Link for node %s is loopback", id)
err = errors.New("link is loopback") err = errors.New("link is loopback")
continue continue
} }
@ -414,7 +433,7 @@ func (t *tun) process() {
// if the message was being returned by the loopback listener // if the message was being returned by the loopback listener
// send it back up the loopback link only // send it back up the loopback link only
if msg.loopback && !loopback { if msg.loopback && !loopback {
log.Tracef("Link for message %s is loopback", node) log.Tracef("Link for message %s is loopback", id)
err = errors.New("link is not loopback") err = errors.New("link is not loopback")
continue continue
} }
@ -439,55 +458,128 @@ func (t *tun) process() {
sendTo = append(sendTo, link) sendTo = append(sendTo, link)
} }
t.RUnlock() // no links to send to
if len(sendTo) == 0 {
// send the message t.respond(msg, err)
for _, link := range sendTo {
// send the message via the current link
log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote())
if errr := link.Send(newMsg); errr != nil {
log.Debugf("Tunnel error sending %+v to %s: %v", newMsg.Header, link.Remote(), errr)
err = errors.New(errr.Error())
t.delLink(link.Remote())
continue
}
// is sent
sent = true
// keep sending broadcast messages
if msg.mode > Unicast {
continue
}
// break on unicast
break
}
var gerr error
// set the error if not sent
if !sent {
gerr = err
}
// skip if its not been set
if msg.errChan == nil {
continue continue
} }
// return error non blocking // send the message
select { t.sendTo(sendTo, msg)
case msg.errChan <- gerr: case <-t.updated:
default: t.RLock()
var newLinks []*link
for _, link := range t.links {
newLinks = append(links, link)
} }
t.RUnlock()
// links were updated
links = newLinks
case <-t.closed: case <-t.closed:
return return
} }
} }
} }
// send response back for a message to the caller
func (t *tun) respond(msg *message, err error) {
select {
case msg.errChan <- err:
default:
}
}
// sendTo sends a message to the chosen links
func (t *tun) sendTo(links []*link, msg *message) error {
// the function that sends the actual message
send := func(link *link, msg *transport.Message) error {
if err := link.Send(msg); err != nil {
log.Debugf("Tunnel error sending %+v to %s: %v", msg.Header, link.Remote(), err)
t.delLink(link.Remote())
return err
}
return nil
}
newMsg := &transport.Message{
Header: make(map[string]string),
}
// set the data
if msg.data != nil {
for k, v := range msg.data.Header {
newMsg.Header[k] = v
}
newMsg.Body = msg.data.Body
}
// set message head
newMsg.Header["Micro-Tunnel"] = msg.typ
// set the tunnel id on the outgoing message
newMsg.Header["Micro-Tunnel-Id"] = msg.tunnel
// set the tunnel channel on the outgoing message
newMsg.Header["Micro-Tunnel-Channel"] = msg.channel
// set the session id
newMsg.Header["Micro-Tunnel-Session"] = msg.session
// error channel for call
errChan := make(chan error, len(links))
// send the message
for _, link := range links {
// send the message via the current link
log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote())
// blast it in a go routine since its multicast/broadcast
if msg.mode > Unicast {
// make a copy
m := &transport.Message{
Header: make(map[string]string),
}
copy(m.Body, newMsg.Body)
for k, v := range newMsg.Header {
m.Header[k] = v
}
// execute in parallel
go func() {
errChan <- send(link, m)
}()
continue
}
// otherwise send as unicast
if err := send(link, newMsg); err != nil {
// put in the error chan if it failed
errChan <- err
continue
}
// sent successfully so just return
t.respond(msg, nil)
return nil
}
// either all unicast attempts failed or we're
// checking the multicast/broadcast attempts
var err error
// check all the errors
for i := 0; i < len(links); i++ {
err = <-errChan
// success
if err == nil {
break
}
}
// return error. it's non blocking
t.respond(msg, err)
return err
}
func (t *tun) delLink(remote string) { func (t *tun) delLink(remote string) {
t.Lock() t.Lock()
@ -502,9 +594,21 @@ func (t *tun) delLink(remote string) {
delete(t.links, id) delete(t.links, id)
} }
// let ourselves know of a link change
go t.notify()
t.Unlock() t.Unlock()
} }
// notify ourselves of a link change
func (t *tun) notify() {
select {
case t.updated <- true:
// unblock after a second
case <-time.After(time.Second):
}
}
// process incoming messages // process incoming messages
func (t *tun) listen(link *link) { func (t *tun) listen(link *link) {
// remove the link on exit // remove the link on exit
@ -856,6 +960,9 @@ func (t *tun) setupLinks() {
// wait for all threads to finish // wait for all threads to finish
wg.Wait() wg.Wait()
// notify ourselves of the update
t.notify()
} }
// connect the tunnel to all the nodes and listen for incoming tunnel connections // connect the tunnel to all the nodes and listen for incoming tunnel connections
@ -1042,19 +1149,7 @@ func (t *tun) Close() error {
// Dial an address // Dial an address
func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) { func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
log.Debugf("Tunnel dialing %s", channel) // get the options
c, ok := t.newSession(channel, t.newSessionId())
if !ok {
return nil, errors.New("error dialing " + channel)
}
// set remote
c.remote = channel
// set local
c.local = "local"
// outbound session
c.outbound = true
// get opts
options := DialOptions{ options := DialOptions{
Timeout: DefaultDialTimeout, Timeout: DefaultDialTimeout,
Wait: true, Wait: true,
@ -1064,12 +1159,28 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
o(&options) o(&options)
} }
// set the multicast option log.Debugf("Tunnel dialing %s", channel)
// create a new session
c, ok := t.newSession(channel, t.newSessionId())
if !ok {
return nil, errors.New("error dialing " + channel)
}
// set remote
c.remote = channel
// set local
c.local = "local"
// outbound session
c.outbound = true
// set the mode of connection unicast/multicast/broadcast
c.mode = options.Mode c.mode = options.Mode
// set the dial timeout // set the dial timeout
c.dialTimeout = options.Timeout c.dialTimeout = options.Timeout
// set read timeout set to never // set read timeout set to never
c.readTimeout = time.Duration(-1) c.readTimeout = time.Duration(-1)
// set the link
c.link = options.Link
var links []*link var links []*link
// did we measure the rtt // did we measure the rtt
@ -1080,7 +1191,7 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// non multicast so we need to find the link // non multicast so we need to find the link
for _, link := range t.links { for _, link := range t.links {
// use the link specified it its available // use the link specified it its available
if id := options.Link; len(id) > 0 && link.id != id { if len(c.link) > 0 && link.id != c.link {
continue continue
} }
@ -1096,20 +1207,36 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
t.RUnlock() t.RUnlock()
// link not found and one was specified so error out // link option was specified to pick the link
if len(links) == 0 && len(options.Link) > 0 { if len(options.Link) > 0 {
// delete session and return error // link not found and one was specified so error out
t.delSession(c.channel, c.session) if len(links) == 0 {
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound) // delete session and return error
return nil, ErrLinkNotFound t.delSession(c.channel, c.session)
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound)
return nil, ErrLinkNotFound
}
// assume discovered because we picked
c.discovered = true
// link asked for and found and now
// we've been asked not to wait so return
if !options.Wait {
c.accepted = true
return c, nil
}
} }
// discovered so set the link if not multicast // discovered so set the link if not multicast
if c.discovered && c.mode == Unicast { if c.discovered && c.mode == Unicast {
// pickLink will pick the best link // pick a link if not specified
link := t.pickLink(links) if len(c.link) == 0 {
// set the link // pickLink will pick the best link
c.link = link.id link := t.pickLink(links)
// set the link
c.link = link.id
}
} }
// if its not already discovered we need to attempt to do so // if its not already discovered we need to attempt to do so
@ -1143,12 +1270,8 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// return early if its not unicast // return early if its not unicast
// we will not wait for "open" for multicast // we will not wait for "open" for multicast
if c.mode != Unicast { // and we will not wait it told not to
return c, nil if c.mode != Unicast || !options.Wait {
}
// if we're not told to wait
if !options.Wait {
return c, nil return c, nil
} }
@ -1241,9 +1364,6 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
// to the existign sessions // to the existign sessions
go tl.process() go tl.process()
// announces the listener channel to others
go tl.announce()
// return the listener // return the listener
return tl, nil return tl, nil
} }

View File

@ -2,7 +2,6 @@ package tunnel
import ( import (
"io" "io"
"time"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
) )
@ -24,24 +23,6 @@ type tunListener struct {
delFunc func() delFunc func()
} }
// periodically announce self the channel being listened on
func (t *tunListener) announce() {
tick := time.NewTicker(time.Second * 30)
defer tick.Stop()
// first announcement
t.session.Announce()
for {
select {
case <-tick.C:
t.session.Announce()
case <-t.closed:
return
}
}
}
func (t *tunListener) process() { func (t *tunListener) process() {
// our connection map for session // our connection map for session
conns := make(map[string]*session) conns := make(map[string]*session)