Merge pull request #876 from milosgajdos83/peerlink-route-metric
Peerlink route metric
This commit is contained in:
		| @@ -111,7 +111,7 @@ func (r *routerSelector) getRoutes(service string) ([]router.Route, error) { | |||||||
| 			Gateway: r.Gateway, | 			Gateway: r.Gateway, | ||||||
| 			Network: r.Network, | 			Network: r.Network, | ||||||
| 			Link:    r.Link, | 			Link:    r.Link, | ||||||
| 			Metric:  int(r.Metric), | 			Metric:  r.Metric, | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ import ( | |||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"hash/fnv" | 	"hash/fnv" | ||||||
|  | 	"math" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| @@ -37,6 +38,8 @@ var ( | |||||||
| var ( | var ( | ||||||
| 	// ErrClientNotFound is returned when client for tunnel channel could not be found | 	// ErrClientNotFound is returned when client for tunnel channel could not be found | ||||||
| 	ErrClientNotFound = errors.New("client not found") | 	ErrClientNotFound = errors.New("client not found") | ||||||
|  | 	// ErrPeerLinkNotFound is returned when peer link could not be found in tunnel Links | ||||||
|  | 	ErrPeerLinkNotFound = errors.New("peer link not found") | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // network implements Network interface | // network implements Network interface | ||||||
| @@ -58,6 +61,8 @@ type network struct { | |||||||
|  |  | ||||||
| 	// tunClient is a map of tunnel clients keyed over tunnel channel names | 	// tunClient is a map of tunnel clients keyed over tunnel channel names | ||||||
| 	tunClient map[string]transport.Client | 	tunClient map[string]transport.Client | ||||||
|  | 	// peerLinks is a map of links for each peer | ||||||
|  | 	peerLinks map[string]tunnel.Link | ||||||
|  |  | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	// connected marks the network as connected | 	// connected marks the network as connected | ||||||
| @@ -138,6 +143,7 @@ func newNetwork(opts ...Option) Network { | |||||||
| 		server:    server, | 		server:    server, | ||||||
| 		client:    client, | 		client:    client, | ||||||
| 		tunClient: make(map[string]transport.Client), | 		tunClient: make(map[string]transport.Client), | ||||||
|  | 		peerLinks: make(map[string]tunnel.Link), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	network.node.network = network | 	network.node.network = network | ||||||
| @@ -246,19 +252,22 @@ func (n *network) resolve() { | |||||||
| } | } | ||||||
|  |  | ||||||
| // handleNetConn handles network announcement messages | // handleNetConn handles network announcement messages | ||||||
| func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message) { | func (n *network) handleNetConn(s tunnel.Session, msg chan *message) { | ||||||
| 	for { | 	for { | ||||||
| 		m := new(transport.Message) | 		m := new(transport.Message) | ||||||
| 		if err := sess.Recv(m); err != nil { | 		if err := s.Recv(m); err != nil { | ||||||
| 			log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) | 			log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) | ||||||
| 			if sessErr := sess.Close(); sessErr != nil { | 			if sessionErr := s.Close(); sessionErr != nil { | ||||||
| 				log.Debugf("Network tunnel [%s] closing connection error: %v", sessErr) | 				log.Debugf("Network tunnel [%s] closing connection error: %v", NetworkChannel, sessionErr) | ||||||
| 			} | 			} | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		select { | 		select { | ||||||
| 		case msg <- m: | 		case msg <- &message{ | ||||||
|  | 			msg:     m, | ||||||
|  | 			session: s, | ||||||
|  | 		}: | ||||||
| 		case <-n.closed: | 		case <-n.closed: | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| @@ -266,7 +275,7 @@ func (n *network) handleNetConn(sess tunnel.Session, msg chan *transport.Message | |||||||
| } | } | ||||||
|  |  | ||||||
| // acceptNetConn accepts connections from NetworkChannel | // acceptNetConn accepts connections from NetworkChannel | ||||||
| func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) { | func (n *network) acceptNetConn(l tunnel.Listener, recv chan *message) { | ||||||
| 	var i int | 	var i int | ||||||
| 	for { | 	for { | ||||||
| 		// accept a connection | 		// accept a connection | ||||||
| @@ -295,10 +304,41 @@ func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // updatePeerLinks updates link for a given peer | ||||||
|  | func (n *network) updatePeerLinks(peerAddr string, linkId string) error { | ||||||
|  | 	n.Lock() | ||||||
|  | 	defer n.Unlock() | ||||||
|  | 	log.Debugf("Network looking up link %s in the peer links", linkId) | ||||||
|  | 	// lookup the peer link | ||||||
|  | 	var peerLink tunnel.Link | ||||||
|  | 	for _, link := range n.tunnel.Links() { | ||||||
|  | 		if link.Id() == linkId { | ||||||
|  | 			peerLink = link | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if peerLink == nil { | ||||||
|  | 		return ErrPeerLinkNotFound | ||||||
|  | 	} | ||||||
|  | 	// if the peerLink is found in the returned links update peerLinks | ||||||
|  | 	log.Debugf("Network updating peer links for peer %s", peerAddr) | ||||||
|  | 	// add peerLink to the peerLinks map | ||||||
|  | 	if link, ok := n.peerLinks[peerAddr]; ok { | ||||||
|  | 		// if the existing has better Length then the new, replace it | ||||||
|  | 		if link.Length() < peerLink.Length() { | ||||||
|  | 			n.peerLinks[peerAddr] = peerLink | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		n.peerLinks[peerAddr] = peerLink | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // processNetChan processes messages received on NetworkChannel | // processNetChan processes messages received on NetworkChannel | ||||||
| func (n *network) processNetChan(listener tunnel.Listener) { | func (n *network) processNetChan(listener tunnel.Listener) { | ||||||
| 	// receive network message queue | 	// receive network message queue | ||||||
| 	recv := make(chan *transport.Message, 128) | 	recv := make(chan *message, 128) | ||||||
|  |  | ||||||
| 	// accept NetworkChannel connections | 	// accept NetworkChannel connections | ||||||
| 	go n.acceptNetConn(listener, recv) | 	go n.acceptNetConn(listener, recv) | ||||||
| @@ -307,12 +347,12 @@ func (n *network) processNetChan(listener tunnel.Listener) { | |||||||
| 		select { | 		select { | ||||||
| 		case m := <-recv: | 		case m := <-recv: | ||||||
| 			// switch on type of message and take action | 			// switch on type of message and take action | ||||||
| 			switch m.Header["Micro-Method"] { | 			switch m.msg.Header["Micro-Method"] { | ||||||
| 			case "connect": | 			case "connect": | ||||||
| 				// mark the time the message has been received | 				// mark the time the message has been received | ||||||
| 				now := time.Now() | 				now := time.Now() | ||||||
| 				pbNetConnect := &pbNet.Connect{} | 				pbNetConnect := &pbNet.Connect{} | ||||||
| 				if err := proto.Unmarshal(m.Body, pbNetConnect); err != nil { | 				if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil { | ||||||
| 					log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) | 					log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| @@ -327,6 +367,12 @@ func (n *network) processNetChan(listener tunnel.Listener) { | |||||||
| 					peers:    make(map[string]*node), | 					peers:    make(map[string]*node), | ||||||
| 					lastSeen: now, | 					lastSeen: now, | ||||||
| 				} | 				} | ||||||
|  | 				// update peer links | ||||||
|  | 				log.Debugf("Network updating peer link %s for peer: %s", m.session.Link(), pbNetConnect.Node.Address) | ||||||
|  | 				if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil { | ||||||
|  | 					log.Debugf("Network failed updating peer links: %s", err) | ||||||
|  | 				} | ||||||
|  | 				// add peer to the list of node peers | ||||||
| 				if err := n.node.AddPeer(peer); err == ErrPeerExists { | 				if err := n.node.AddPeer(peer); err == ErrPeerExists { | ||||||
| 					log.Debugf("Network peer exists, refreshing: %s", peer.id) | 					log.Debugf("Network peer exists, refreshing: %s", peer.id) | ||||||
| 					// update lastSeen time for the existing node | 					// update lastSeen time for the existing node | ||||||
| @@ -349,7 +395,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { | |||||||
| 				// mark the time the message has been received | 				// mark the time the message has been received | ||||||
| 				now := time.Now() | 				now := time.Now() | ||||||
| 				pbNetPeer := &pbNet.Peer{} | 				pbNetPeer := &pbNet.Peer{} | ||||||
| 				if err := proto.Unmarshal(m.Body, pbNetPeer); err != nil { | 				if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil { | ||||||
| 					log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) | 					log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| @@ -364,6 +410,11 @@ func (n *network) processNetChan(listener tunnel.Listener) { | |||||||
| 					peers:    make(map[string]*node), | 					peers:    make(map[string]*node), | ||||||
| 					lastSeen: now, | 					lastSeen: now, | ||||||
| 				} | 				} | ||||||
|  | 				// update peer links | ||||||
|  | 				log.Debugf("Network updating peer link %s for peer: %s", m.session.Link(), pbNetPeer.Node.Address) | ||||||
|  | 				if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil { | ||||||
|  | 					log.Debugf("Network failed updating peer links: %s", err) | ||||||
|  | 				} | ||||||
| 				if err := n.node.AddPeer(peer); err == nil { | 				if err := n.node.AddPeer(peer); err == nil { | ||||||
| 					// send a solicit message when discovering new peer | 					// send a solicit message when discovering new peer | ||||||
| 					msg := &pbRtr.Solicit{ | 					msg := &pbRtr.Solicit{ | ||||||
| @@ -393,7 +444,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { | |||||||
| 				} | 				} | ||||||
| 			case "close": | 			case "close": | ||||||
| 				pbNetClose := &pbNet.Close{} | 				pbNetClose := &pbNet.Close{} | ||||||
| 				if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { | 				if err := proto.Unmarshal(m.msg.Body, pbNetClose); err != nil { | ||||||
| 					log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) | 					log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| @@ -412,6 +463,10 @@ func (n *network) processNetChan(listener tunnel.Listener) { | |||||||
| 				if err := n.prunePeerRoutes(peer); err != nil { | 				if err := n.prunePeerRoutes(peer); err != nil { | ||||||
| 					log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) | 					log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) | ||||||
| 				} | 				} | ||||||
|  | 				// deelete peer from the peerLinks | ||||||
|  | 				n.Lock() | ||||||
|  | 				delete(n.peerLinks, pbNetClose.Node.Address) | ||||||
|  | 				n.Unlock() | ||||||
| 			} | 			} | ||||||
| 		case <-n.closed: | 		case <-n.closed: | ||||||
| 			return | 			return | ||||||
| @@ -521,6 +576,9 @@ func (n *network) prune() { | |||||||
| 			pruned := n.PruneStalePeerNodes(PruneTime) | 			pruned := n.PruneStalePeerNodes(PruneTime) | ||||||
| 			for id, peer := range pruned { | 			for id, peer := range pruned { | ||||||
| 				log.Debugf("Network peer exceeded prune time: %s", id) | 				log.Debugf("Network peer exceeded prune time: %s", id) | ||||||
|  | 				n.Lock() | ||||||
|  | 				delete(n.peerLinks, peer.address) | ||||||
|  | 				n.Unlock() | ||||||
| 				if err := n.prunePeerRoutes(peer); err != nil { | 				if err := n.prunePeerRoutes(peer); err != nil { | ||||||
| 					log.Debugf("Network failed pruning peer %s routes: %v", id, err) | 					log.Debugf("Network failed pruning peer %s routes: %v", id, err) | ||||||
| 				} | 				} | ||||||
| @@ -528,7 +586,7 @@ func (n *network) prune() { | |||||||
| 			// get a list of all routes | 			// get a list of all routes | ||||||
| 			routes, err := n.options.Router.Table().List() | 			routes, err := n.options.Router.Table().List() | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				log.Debugf("Network failed listing routes: %v", err) | 				log.Debugf("Network failed listing routes when pruning peers: %v", err) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			// collect all the router IDs in the routing table | 			// collect all the router IDs in the routing table | ||||||
| @@ -549,16 +607,22 @@ func (n *network) prune() { | |||||||
| } | } | ||||||
|  |  | ||||||
| // handleCtrlConn handles ControlChannel connections | // handleCtrlConn handles ControlChannel connections | ||||||
| func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Message) { | func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { | ||||||
| 	for { | 	for { | ||||||
| 		m := new(transport.Message) | 		m := new(transport.Message) | ||||||
| 		if err := sess.Recv(m); err != nil { | 		if err := s.Recv(m); err != nil { | ||||||
| 			log.Debugf("Network tunnel advert receive error: %v", err) | 			log.Debugf("Network tunnel [%s] receive error: %v", ControlChannel, err) | ||||||
|  | 			if sessionErr := s.Close(); sessionErr != nil { | ||||||
|  | 				log.Debugf("Network tunnel [%s] closing connection error: %v", ControlChannel, sessionErr) | ||||||
|  | 			} | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		select { | 		select { | ||||||
| 		case msg <- m: | 		case msg <- &message{ | ||||||
|  | 			msg:     m, | ||||||
|  | 			session: s, | ||||||
|  | 		}: | ||||||
| 		case <-n.closed: | 		case <-n.closed: | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
| @@ -566,7 +630,7 @@ func (n *network) handleCtrlConn(sess tunnel.Session, msg chan *transport.Messag | |||||||
| } | } | ||||||
|  |  | ||||||
| // acceptCtrlConn accepts connections from ControlChannel | // acceptCtrlConn accepts connections from ControlChannel | ||||||
| func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message) { | func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *message) { | ||||||
| 	var i int | 	var i int | ||||||
| 	for { | 	for { | ||||||
| 		// accept a connection | 		// accept a connection | ||||||
| @@ -596,42 +660,79 @@ func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // setRouteMetric calculates metric of the route and updates it in place | // getHopCount queries network graph and returns hop count for given router | ||||||
| // - Local route metric is 1 | // - Routes for local services have hop count 1 | ||||||
| // - Routes with ID of adjacent nodes are 10 | // - Routes with ID of adjacent nodes have hop count 2 | ||||||
| // - Routes by peers of the advertiser are 100 | // - Routes by peers of the advertiser have hop count 3 | ||||||
| // - Routes beyond your neighbourhood are 1000 | // - Routes beyond node neighbourhood have hop count 4 | ||||||
| func (n *network) setRouteMetric(route *router.Route) { | func (n *network) getHopCount(rtr string) int { | ||||||
|  | 	// make sure node.peers are not modified | ||||||
|  | 	n.node.RLock() | ||||||
|  | 	defer n.node.RUnlock() | ||||||
|  |  | ||||||
| 	// we are the origin of the route | 	// we are the origin of the route | ||||||
| 	if route.Router == n.options.Id { | 	if rtr == n.options.Id { | ||||||
| 		route.Metric = 1 | 		return 1 | ||||||
| 		return |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// check if the route origin is our peer | 	// the route origin is our peer | ||||||
| 	if _, ok := n.peers[route.Router]; ok { | 	if _, ok := n.peers[rtr]; ok { | ||||||
| 		route.Metric = 10 | 		return 2 | ||||||
| 		return |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// check if the route origin is the peer of our peer | 	// the route origin is the peer of our peer | ||||||
| 	for _, peer := range n.peers { | 	for _, peer := range n.peers { | ||||||
| 		for id := range peer.peers { | 		for id := range peer.peers { | ||||||
| 			if route.Router == id { | 			if rtr == id { | ||||||
| 				route.Metric = 100 | 				return 3 | ||||||
| 				return |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	// otherwise we are three hops away | ||||||
|  | 	return 4 | ||||||
|  | } | ||||||
|  |  | ||||||
| 	// the origin of the route is beyond our neighbourhood | // getRouteMetric calculates router metric and returns it | ||||||
| 	route.Metric = 1000 | // Route metric is calculated based on link status and route hopd count | ||||||
|  | func (n *network) getRouteMetric(router string, gateway string, link string) int64 { | ||||||
|  | 	// set the route metric | ||||||
|  | 	n.RLock() | ||||||
|  | 	defer n.RUnlock() | ||||||
|  |  | ||||||
|  | 	if link == "local" && gateway == "" { | ||||||
|  | 		log.Debugf("Network link: %s, gateway: blank", link) | ||||||
|  | 		return 1 | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if link == "local" && gateway != "" { | ||||||
|  | 		log.Debugf("Network link: %s, gateway: %s", link, gateway) | ||||||
|  | 		return 2 | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	log.Debugf("Network looking up %s link to gateway: %s", link, gateway) | ||||||
|  | 	if link, ok := n.peerLinks[gateway]; ok { | ||||||
|  | 		// maka sure delay is non-zero | ||||||
|  | 		delay := link.Delay() | ||||||
|  | 		if delay == 0 { | ||||||
|  | 			delay = 1 | ||||||
|  | 		} | ||||||
|  | 		// get the route hop count | ||||||
|  | 		hops := n.getHopCount(router) | ||||||
|  | 		// make sure length is non-zero | ||||||
|  | 		length := link.Length() | ||||||
|  | 		if length == 0 { | ||||||
|  | 			length = 10e10 | ||||||
|  | 		} | ||||||
|  | 		return (delay * length * int64(hops)) / 10e9 | ||||||
|  | 	} | ||||||
|  | 	log.Debugf("Network failed to find a link to gateway: %s", gateway) | ||||||
|  | 	return math.MaxInt64 | ||||||
| } | } | ||||||
|  |  | ||||||
| // processCtrlChan processes messages received on ControlChannel | // processCtrlChan processes messages received on ControlChannel | ||||||
| func (n *network) processCtrlChan(listener tunnel.Listener) { | func (n *network) processCtrlChan(listener tunnel.Listener) { | ||||||
| 	// receive control message queue | 	// receive control message queue | ||||||
| 	recv := make(chan *transport.Message, 128) | 	recv := make(chan *message, 128) | ||||||
|  |  | ||||||
| 	// accept ControlChannel cconnections | 	// accept ControlChannel cconnections | ||||||
| 	go n.acceptCtrlConn(listener, recv) | 	go n.acceptCtrlConn(listener, recv) | ||||||
| @@ -640,10 +741,10 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { | |||||||
| 		select { | 		select { | ||||||
| 		case m := <-recv: | 		case m := <-recv: | ||||||
| 			// switch on type of message and take action | 			// switch on type of message and take action | ||||||
| 			switch m.Header["Micro-Method"] { | 			switch m.msg.Header["Micro-Method"] { | ||||||
| 			case "advert": | 			case "advert": | ||||||
| 				pbRtrAdvert := &pbRtr.Advert{} | 				pbRtrAdvert := &pbRtr.Advert{} | ||||||
| 				if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil { | 				if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil { | ||||||
| 					log.Debugf("Network fail to unmarshal advert message: %v", err) | 					log.Debugf("Network fail to unmarshal advert message: %v", err) | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| @@ -678,16 +779,15 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { | |||||||
| 						Network: event.Route.Network, | 						Network: event.Route.Network, | ||||||
| 						Router:  event.Route.Router, | 						Router:  event.Route.Router, | ||||||
| 						Link:    event.Route.Link, | 						Link:    event.Route.Link, | ||||||
| 						Metric:  int(event.Route.Metric), | 						Metric:  event.Route.Metric, | ||||||
| 					} | 					} | ||||||
| 					// set the route metric | 					// calculate route metric and add to the advertised metric | ||||||
| 					n.node.RLock() | 					// we need to make sure we do not overflow math.MaxInt64 | ||||||
| 					n.setRouteMetric(&route) | 					log.Debugf("Network metric for router %s and gateway %s", event.Route.Router, event.Route.Gateway) | ||||||
| 					n.node.RUnlock() | 					if metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link); metric != math.MaxInt64 { | ||||||
| 					// throw away metric bigger than 1000 | 						route.Metric += metric | ||||||
| 					if route.Metric > 1000 { | 					} else { | ||||||
| 						log.Debugf("Network route metric %d dropping node: %s", route.Metric, route.Router) | 						route.Metric = metric | ||||||
| 						continue |  | ||||||
| 					} | 					} | ||||||
| 					// create router event | 					// create router event | ||||||
| 					e := &router.Event{ | 					e := &router.Event{ | ||||||
| @@ -717,7 +817,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { | |||||||
| 				} | 				} | ||||||
| 			case "solicit": | 			case "solicit": | ||||||
| 				pbRtrSolicit := &pbRtr.Solicit{} | 				pbRtrSolicit := &pbRtr.Solicit{} | ||||||
| 				if err := proto.Unmarshal(m.Body, pbRtrSolicit); err != nil { | 				if err := proto.Unmarshal(m.msg.Body, pbRtrSolicit); err != nil { | ||||||
| 					log.Debugf("Network fail to unmarshal solicit message: %v", err) | 					log.Debugf("Network fail to unmarshal solicit message: %v", err) | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| @@ -758,9 +858,9 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { | |||||||
| 					hasher.Write([]byte(event.Route.Address + n.node.id)) | 					hasher.Write([]byte(event.Route.Address + n.node.id)) | ||||||
| 					address = fmt.Sprintf("%d", hasher.Sum64()) | 					address = fmt.Sprintf("%d", hasher.Sum64()) | ||||||
| 				} | 				} | ||||||
|  | 				// calculate route metric to advertise | ||||||
|  | 				metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link) | ||||||
| 				// NOTE: we override Gateway, Link and Address here | 				// NOTE: we override Gateway, Link and Address here | ||||||
| 				// TODO: should we avoid overriding gateway? |  | ||||||
| 				route := &pbRtr.Route{ | 				route := &pbRtr.Route{ | ||||||
| 					Service: event.Route.Service, | 					Service: event.Route.Service, | ||||||
| 					Address: address, | 					Address: address, | ||||||
| @@ -768,7 +868,7 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { | |||||||
| 					Network: event.Route.Network, | 					Network: event.Route.Network, | ||||||
| 					Router:  event.Route.Router, | 					Router:  event.Route.Router, | ||||||
| 					Link:    DefaultLink, | 					Link:    DefaultLink, | ||||||
| 					Metric:  int64(event.Route.Metric), | 					Metric:  metric, | ||||||
| 				} | 				} | ||||||
| 				e := &pbRtr.Event{ | 				e := &pbRtr.Event{ | ||||||
| 					Type:      pbRtr.EventType(event.Type), | 					Type:      pbRtr.EventType(event.Type), | ||||||
|   | |||||||
| @@ -6,6 +6,8 @@ import ( | |||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/client" | 	"github.com/micro/go-micro/client" | ||||||
| 	"github.com/micro/go-micro/server" | 	"github.com/micro/go-micro/server" | ||||||
|  | 	"github.com/micro/go-micro/transport" | ||||||
|  | 	"github.com/micro/go-micro/tunnel" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| @@ -54,6 +56,14 @@ type Network interface { | |||||||
| 	Server() server.Server | 	Server() server.Server | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // message is network message | ||||||
|  | type message struct { | ||||||
|  | 	// msg is transport message | ||||||
|  | 	msg *transport.Message | ||||||
|  | 	// session is tunnel session | ||||||
|  | 	session tunnel.Session | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewNetwork returns a new network interface | // NewNetwork returns a new network interface | ||||||
| func NewNetwork(opts ...Option) Network { | func NewNetwork(opts ...Option) Network { | ||||||
| 	return newNetwork(opts...) | 	return newNetwork(opts...) | ||||||
|   | |||||||
| @@ -31,7 +31,7 @@ func (r *Router) Lookup(ctx context.Context, req *pb.LookupRequest, resp *pb.Loo | |||||||
| 			Network: route.Network, | 			Network: route.Network, | ||||||
| 			Router:  route.Router, | 			Router:  route.Router, | ||||||
| 			Link:    route.Link, | 			Link:    route.Link, | ||||||
| 			Metric:  int64(route.Metric), | 			Metric:  route.Metric, | ||||||
| 		} | 		} | ||||||
| 		respRoutes = append(respRoutes, respRoute) | 		respRoutes = append(respRoutes, respRoute) | ||||||
| 	} | 	} | ||||||
| @@ -67,7 +67,7 @@ func (r *Router) Advertise(ctx context.Context, req *pb.Request, stream pb.Route | |||||||
| 				Network: event.Route.Network, | 				Network: event.Route.Network, | ||||||
| 				Router:  event.Route.Router, | 				Router:  event.Route.Router, | ||||||
| 				Link:    event.Route.Link, | 				Link:    event.Route.Link, | ||||||
| 				Metric:  int64(event.Route.Metric), | 				Metric:  event.Route.Metric, | ||||||
| 			} | 			} | ||||||
| 			e := &pb.Event{ | 			e := &pb.Event{ | ||||||
| 				Type:      pb.EventType(event.Type), | 				Type:      pb.EventType(event.Type), | ||||||
| @@ -108,7 +108,7 @@ func (r *Router) Process(ctx context.Context, req *pb.Advert, rsp *pb.ProcessRes | |||||||
| 			Network: event.Route.Network, | 			Network: event.Route.Network, | ||||||
| 			Router:  event.Route.Router, | 			Router:  event.Route.Router, | ||||||
| 			Link:    event.Route.Link, | 			Link:    event.Route.Link, | ||||||
| 			Metric:  int(event.Route.Metric), | 			Metric:  event.Route.Metric, | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		events[i] = &router.Event{ | 		events[i] = &router.Event{ | ||||||
| @@ -174,7 +174,7 @@ func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Rout | |||||||
| 			Network: event.Route.Network, | 			Network: event.Route.Network, | ||||||
| 			Router:  event.Route.Router, | 			Router:  event.Route.Router, | ||||||
| 			Link:    event.Route.Link, | 			Link:    event.Route.Link, | ||||||
| 			Metric:  int64(event.Route.Metric), | 			Metric:  event.Route.Metric, | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		tableEvent := &pb.Event{ | 		tableEvent := &pb.Event{ | ||||||
|   | |||||||
| @@ -20,7 +20,7 @@ func (t *Table) Create(ctx context.Context, route *pb.Route, resp *pb.CreateResp | |||||||
| 		Network: route.Network, | 		Network: route.Network, | ||||||
| 		Router:  route.Router, | 		Router:  route.Router, | ||||||
| 		Link:    route.Link, | 		Link:    route.Link, | ||||||
| 		Metric:  int(route.Metric), | 		Metric:  route.Metric, | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.router", "failed to create route: %s", err) | 		return errors.InternalServerError("go.micro.router", "failed to create route: %s", err) | ||||||
| @@ -37,7 +37,7 @@ func (t *Table) Update(ctx context.Context, route *pb.Route, resp *pb.UpdateResp | |||||||
| 		Network: route.Network, | 		Network: route.Network, | ||||||
| 		Router:  route.Router, | 		Router:  route.Router, | ||||||
| 		Link:    route.Link, | 		Link:    route.Link, | ||||||
| 		Metric:  int(route.Metric), | 		Metric:  route.Metric, | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.router", "failed to update route: %s", err) | 		return errors.InternalServerError("go.micro.router", "failed to update route: %s", err) | ||||||
| @@ -54,7 +54,7 @@ func (t *Table) Delete(ctx context.Context, route *pb.Route, resp *pb.DeleteResp | |||||||
| 		Network: route.Network, | 		Network: route.Network, | ||||||
| 		Router:  route.Router, | 		Router:  route.Router, | ||||||
| 		Link:    route.Link, | 		Link:    route.Link, | ||||||
| 		Metric:  int(route.Metric), | 		Metric:  route.Metric, | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return errors.InternalServerError("go.micro.router", "failed to delete route: %s", err) | 		return errors.InternalServerError("go.micro.router", "failed to delete route: %s", err) | ||||||
| @@ -79,7 +79,7 @@ func (t *Table) List(ctx context.Context, req *pb.Request, resp *pb.ListResponse | |||||||
| 			Network: route.Network, | 			Network: route.Network, | ||||||
| 			Router:  route.Router, | 			Router:  route.Router, | ||||||
| 			Link:    route.Link, | 			Link:    route.Link, | ||||||
| 			Metric:  int64(route.Metric), | 			Metric:  route.Metric, | ||||||
| 		} | 		} | ||||||
| 		respRoutes = append(respRoutes, respRoute) | 		respRoutes = append(respRoutes, respRoute) | ||||||
| 	} | 	} | ||||||
| @@ -104,7 +104,7 @@ func (t *Table) Query(ctx context.Context, req *pb.QueryRequest, resp *pb.QueryR | |||||||
| 			Network: route.Network, | 			Network: route.Network, | ||||||
| 			Router:  route.Router, | 			Router:  route.Router, | ||||||
| 			Link:    route.Link, | 			Link:    route.Link, | ||||||
| 			Metric:  int64(route.Metric), | 			Metric:  route.Metric, | ||||||
| 		} | 		} | ||||||
| 		respRoutes = append(respRoutes, respRoute) | 		respRoutes = append(respRoutes, respRoute) | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -8,9 +8,7 @@ var ( | |||||||
| 	// DefaultLink is default network link | 	// DefaultLink is default network link | ||||||
| 	DefaultLink = "local" | 	DefaultLink = "local" | ||||||
| 	// DefaultLocalMetric is default route cost for a local route | 	// DefaultLocalMetric is default route cost for a local route | ||||||
| 	DefaultLocalMetric = 1 | 	DefaultLocalMetric int64 = 1 | ||||||
| 	// DefaultNetworkMetric is default route cost for a network route |  | ||||||
| 	DefaultNetworkMetric = 10 |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Route is network route | // Route is network route | ||||||
| @@ -28,7 +26,7 @@ type Route struct { | |||||||
| 	// Link is network link | 	// Link is network link | ||||||
| 	Link string | 	Link string | ||||||
| 	// Metric is the route cost metric | 	// Metric is the route cost metric | ||||||
| 	Metric int | 	Metric int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| // Hash returns route hash sum. | // Hash returns route hash sum. | ||||||
|   | |||||||
| @@ -132,7 +132,7 @@ func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_A | |||||||
| 				Gateway: event.Route.Gateway, | 				Gateway: event.Route.Gateway, | ||||||
| 				Network: event.Route.Network, | 				Network: event.Route.Network, | ||||||
| 				Link:    event.Route.Link, | 				Link:    event.Route.Link, | ||||||
| 				Metric:  int(event.Route.Metric), | 				Metric:  event.Route.Metric, | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			events[i] = &router.Event{ | 			events[i] = &router.Event{ | ||||||
| @@ -196,7 +196,7 @@ func (s *svc) Process(advert *router.Advert) error { | |||||||
| 			Gateway: event.Route.Gateway, | 			Gateway: event.Route.Gateway, | ||||||
| 			Network: event.Route.Network, | 			Network: event.Route.Network, | ||||||
| 			Link:    event.Route.Link, | 			Link:    event.Route.Link, | ||||||
| 			Metric:  int64(event.Route.Metric), | 			Metric:  event.Route.Metric, | ||||||
| 		} | 		} | ||||||
| 		e := &pb.Event{ | 		e := &pb.Event{ | ||||||
| 			Type:      pb.EventType(event.Type), | 			Type:      pb.EventType(event.Type), | ||||||
| @@ -346,7 +346,7 @@ func (s *svc) Lookup(q ...router.QueryOption) ([]router.Route, error) { | |||||||
| 			Gateway: route.Gateway, | 			Gateway: route.Gateway, | ||||||
| 			Network: route.Network, | 			Network: route.Network, | ||||||
| 			Link:    route.Link, | 			Link:    route.Link, | ||||||
| 			Metric:  int(route.Metric), | 			Metric:  route.Metric, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -21,7 +21,7 @@ func (t *table) Create(r router.Route) error { | |||||||
| 		Gateway: r.Gateway, | 		Gateway: r.Gateway, | ||||||
| 		Network: r.Network, | 		Network: r.Network, | ||||||
| 		Link:    r.Link, | 		Link:    r.Link, | ||||||
| 		Metric:  int64(r.Metric), | 		Metric:  r.Metric, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if _, err := t.table.Create(context.Background(), route, t.callOpts...); err != nil { | 	if _, err := t.table.Create(context.Background(), route, t.callOpts...); err != nil { | ||||||
| @@ -39,7 +39,7 @@ func (t *table) Delete(r router.Route) error { | |||||||
| 		Gateway: r.Gateway, | 		Gateway: r.Gateway, | ||||||
| 		Network: r.Network, | 		Network: r.Network, | ||||||
| 		Link:    r.Link, | 		Link:    r.Link, | ||||||
| 		Metric:  int64(r.Metric), | 		Metric:  r.Metric, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if _, err := t.table.Delete(context.Background(), route, t.callOpts...); err != nil { | 	if _, err := t.table.Delete(context.Background(), route, t.callOpts...); err != nil { | ||||||
| @@ -57,7 +57,7 @@ func (t *table) Update(r router.Route) error { | |||||||
| 		Gateway: r.Gateway, | 		Gateway: r.Gateway, | ||||||
| 		Network: r.Network, | 		Network: r.Network, | ||||||
| 		Link:    r.Link, | 		Link:    r.Link, | ||||||
| 		Metric:  int64(r.Metric), | 		Metric:  r.Metric, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if _, err := t.table.Update(context.Background(), route, t.callOpts...); err != nil { | 	if _, err := t.table.Update(context.Background(), route, t.callOpts...); err != nil { | ||||||
| @@ -82,7 +82,7 @@ func (t *table) List() ([]router.Route, error) { | |||||||
| 			Gateway: route.Gateway, | 			Gateway: route.Gateway, | ||||||
| 			Network: route.Network, | 			Network: route.Network, | ||||||
| 			Link:    route.Link, | 			Link:    route.Link, | ||||||
| 			Metric:  int(route.Metric), | 			Metric:  route.Metric, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -115,7 +115,7 @@ func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) { | |||||||
| 			Gateway: route.Gateway, | 			Gateway: route.Gateway, | ||||||
| 			Network: route.Network, | 			Network: route.Network, | ||||||
| 			Link:    route.Link, | 			Link:    route.Link, | ||||||
| 			Metric:  int(route.Metric), | 			Metric:  route.Metric, | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -61,7 +61,7 @@ func (w *watcher) watch(stream pb.Router_WatchService) error { | |||||||
| 			Gateway: resp.Route.Gateway, | 			Gateway: resp.Route.Gateway, | ||||||
| 			Network: resp.Route.Network, | 			Network: resp.Route.Network, | ||||||
| 			Link:    resp.Route.Link, | 			Link:    resp.Route.Link, | ||||||
| 			Metric:  int(resp.Route.Metric), | 			Metric:  resp.Route.Metric, | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		event := &router.Event{ | 		event := &router.Event{ | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user