Cleanup and speedup network convergence along with direct messaging for connect and solicit

This commit is contained in:
Asim Aslam 2019-12-07 19:54:29 +00:00
parent 1d8c66780e
commit c445aed6b1
17 changed files with 494 additions and 199 deletions

View File

@ -316,6 +316,22 @@ func (r *rpcClient) Options() Options {
return r.opts return r.opts
} }
// hasProxy checks if we have proxy set in the environment
func (r *rpcClient) hasProxy() bool {
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
return true
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
return true
}
return false
}
// next returns an iterator for the next nodes to call
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
service := request.Service() service := request.Service()
@ -431,10 +447,18 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return err return err
} }
ch := make(chan error, callOpts.Retries+1) // get the retries
retries := callOpts.Retries
// disable retries when using a proxy
if r.hasProxy() {
retries = 0
}
ch := make(chan error, retries+1)
var gerr error var gerr error
for i := 0; i <= callOpts.Retries; i++ { for i := 0; i <= retries; i++ {
go func(i int) { go func(i int) {
ch <- call(i) ch <- call(i)
}(i) }(i)
@ -514,10 +538,18 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
err error err error
} }
ch := make(chan response, callOpts.Retries+1) // get the retries
retries := callOpts.Retries
// disable retries when using a proxy
if r.hasProxy() {
retries = 0
}
ch := make(chan response, retries+1)
var grr error var grr error
for i := 0; i <= callOpts.Retries; i++ { for i := 0; i <= retries; i++ {
go func(i int) { go func(i int) {
s, err := call(i) s, err := call(i)
ch <- response{s, err} ch <- response{s, err}

View File

@ -88,32 +88,24 @@ func (rwc *readWriteCloser) Close() error {
} }
func getHeaders(m *codec.Message) { func getHeaders(m *codec.Message) {
get := func(hdr string) string { set := func(v, hdr string) string {
if hd := m.Header[hdr]; len(hd) > 0 { if len(v) > 0 {
return hd return v
} }
// old return m.Header[hdr]
return m.Header["X-"+hdr]
} }
// check error in header // check error in header
if len(m.Error) == 0 { m.Error = set(m.Error, "Micro-Error")
m.Error = get("Micro-Error")
}
// check endpoint in header // check endpoint in header
if len(m.Endpoint) == 0 { m.Endpoint = set(m.Endpoint, "Micro-Endpoint")
m.Endpoint = get("Micro-Endpoint")
}
// check method in header // check method in header
if len(m.Method) == 0 { m.Method = set(m.Method, "Micro-Method")
m.Method = get("Micro-Method")
}
if len(m.Id) == 0 { // set the request id
m.Id = get("Micro-Id") m.Id = set(m.Id, "Micro-Id")
}
} }
func setHeaders(m *codec.Message, stream string) { func setHeaders(m *codec.Message, stream string) {
@ -122,7 +114,6 @@ func setHeaders(m *codec.Message, stream string) {
return return
} }
m.Header[hdr] = v m.Header[hdr] = v
m.Header["X-"+hdr] = v
} }
set("Micro-Id", m.Id) set("Micro-Id", m.Id)

View File

@ -71,9 +71,18 @@ type network struct {
connected bool connected bool
// closed closes the network // closed closes the network
closed chan bool closed chan bool
// whether we've announced the first connect successfully // whether we've discovered by the network
// and received back some sort of peer message discovered chan bool
announced chan bool // solicted checks whether routes were solicited by one node
solicited chan string
}
// message is network message
type message struct {
// msg is transport message
msg *transport.Message
// session is tunnel session
session tunnel.Session
} }
// newNetwork returns a new network node // newNetwork returns a new network node
@ -149,14 +158,16 @@ func newNetwork(opts ...Option) Network {
address: peerAddress, address: peerAddress,
peers: make(map[string]*node), peers: make(map[string]*node),
}, },
options: options, options: options,
router: options.Router, router: options.Router,
proxy: options.Proxy, proxy: options.Proxy,
tunnel: options.Tunnel, tunnel: options.Tunnel,
server: server, server: server,
client: client, client: client,
tunClient: make(map[string]transport.Client), tunClient: make(map[string]transport.Client),
peerLinks: make(map[string]tunnel.Link), peerLinks: make(map[string]tunnel.Link),
discovered: make(chan bool, 1),
solicited: make(chan string, 1),
} }
network.node.network = network network.node.network = network
@ -278,6 +289,14 @@ func (n *network) handleNetConn(s tunnel.Session, msg chan *message) {
continue continue
} }
// check if peer is set
peer := m.Header["Micro-Peer"]
// check who the message is intended for
if len(peer) > 0 && peer != n.options.Id {
continue
}
select { select {
case msg <- &message{ case msg <- &message{
msg: m, msg: m,
@ -367,26 +386,33 @@ func (n *network) processNetChan(listener tunnel.Listener) {
// mark the time the message has been received // mark the time the message has been received
now := time.Now() now := time.Now()
pbNetConnect := &pbNet.Connect{} pbNetConnect := &pbNet.Connect{}
if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil { if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil {
log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err)
continue continue
} }
// don't process your own messages // don't process your own messages
if pbNetConnect.Node.Id == n.options.Id { if pbNetConnect.Node.Id == n.options.Id {
continue continue
} }
log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id) log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id)
peer := &node{ peer := &node{
id: pbNetConnect.Node.Id, id: pbNetConnect.Node.Id,
address: pbNetConnect.Node.Address, address: pbNetConnect.Node.Address,
peers: make(map[string]*node), peers: make(map[string]*node),
lastSeen: now, lastSeen: now,
} }
// update peer links // update peer links
log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetConnect.Node.Address) log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetConnect.Node.Address)
if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil { if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil {
log.Debugf("Network failed updating peer links: %s", err) log.Debugf("Network failed updating peer links: %s", err)
} }
// add peer to the list of node peers // add peer to the list of node peers
if err := n.node.AddPeer(peer); err == ErrPeerExists { if err := n.node.AddPeer(peer); err == ErrPeerExists {
log.Debugf("Network peer exists, refreshing: %s", peer.id) log.Debugf("Network peer exists, refreshing: %s", peer.id)
@ -394,50 +420,76 @@ func (n *network) processNetChan(listener tunnel.Listener) {
if err := n.RefreshPeer(peer.id, now); err != nil { if err := n.RefreshPeer(peer.id, now); err != nil {
log.Debugf("Network failed refreshing peer %s: %v", peer.id, err) log.Debugf("Network failed refreshing peer %s: %v", peer.id, err)
} }
continue
} }
// we send the peer message because someone has sent connect
// and wants to know what's on the network. The faster we
// respond the faster we start to converge
// get node peers down to MaxDepth encoded in protobuf // get node peers down to MaxDepth encoded in protobuf
msg := PeersToProto(n.node, MaxDepth) msg := PeersToProto(n.node, MaxDepth)
node := pbNetConnect.Node.Id
// advertise yourself to the network // advertise yourself to the network
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { if err := n.sendTo("peer", NetworkChannel, node, msg); err != nil {
log.Debugf("Network failed to advertise peers: %v", err) log.Debugf("Network failed to advertise peers: %v", err)
} }
// advertise all the routes when a new node has connected // advertise all the routes when a new node has connected
if err := n.router.Solicit(); err != nil { if err := n.router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err) log.Debugf("Network failed to solicit routes: %s", err)
} }
// specify that we're soliciting
select {
case n.solicited <- node:
default:
// don't block
}
case "peer": case "peer":
// mark the time the message has been received // mark the time the message has been received
now := time.Now() now := time.Now()
pbNetPeer := &pbNet.Peer{} pbNetPeer := &pbNet.Peer{}
if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil { if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil {
log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err)
continue continue
} }
// don't process your own messages // don't process your own messages
if pbNetPeer.Node.Id == n.options.Id { if pbNetPeer.Node.Id == n.options.Id {
continue continue
} }
log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address) log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address)
peer := &node{ peer := &node{
id: pbNetPeer.Node.Id, id: pbNetPeer.Node.Id,
address: pbNetPeer.Node.Address, address: pbNetPeer.Node.Address,
peers: make(map[string]*node), peers: make(map[string]*node),
lastSeen: now, lastSeen: now,
} }
// update peer links // update peer links
log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetPeer.Node.Address) log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetPeer.Node.Address)
if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil { if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil {
log.Debugf("Network failed updating peer links: %s", err) log.Debugf("Network failed updating peer links: %s", err)
} }
if err := n.node.AddPeer(peer); err == nil { if err := n.node.AddPeer(peer); err == nil {
// send a solicit message when discovering new peer // send a solicit message when discovering new peer
msg := &pbRtr.Solicit{ msg := &pbRtr.Solicit{
Id: n.options.Id, Id: n.options.Id,
} }
if err := n.sendMsg("solicit", msg, ControlChannel); err != nil {
node := pbNetPeer.Node.Id
// only solicit this peer
if err := n.sendTo("solicit", ControlChannel, node, msg); err != nil {
log.Debugf("Network failed to send solicit message: %s", err) log.Debugf("Network failed to send solicit message: %s", err)
} }
continue continue
// we're expecting any error to be ErrPeerExists // we're expecting any error to be ErrPeerExists
} else if err != ErrPeerExists { } else if err != ErrPeerExists {
@ -446,6 +498,7 @@ func (n *network) processNetChan(listener tunnel.Listener) {
} }
log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id)
// update lastSeen time for the peer // update lastSeen time for the peer
if err := n.RefreshPeer(pbNetPeer.Node.Id, now); err != nil { if err := n.RefreshPeer(pbNetPeer.Node.Id, now); err != nil {
log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err) log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err)
@ -458,12 +511,12 @@ func (n *network) processNetChan(listener tunnel.Listener) {
log.Debugf("Network failed to update peers: %v", err) log.Debugf("Network failed to update peers: %v", err)
} }
// check if we announced and were discovered // tell the connect loop that we've been discovered
// so it stops sending connect messages out
select { select {
case <-n.announced: case n.discovered <- true:
// we've sent the connect and received this response
default: default:
close(n.announced) // don't block here
} }
case "close": case "close":
pbNetClose := &pbNet.Close{} pbNetClose := &pbNet.Close{}
@ -471,22 +524,28 @@ func (n *network) processNetChan(listener tunnel.Listener) {
log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err)
continue continue
} }
// don't process your own messages // don't process your own messages
if pbNetClose.Node.Id == n.options.Id { if pbNetClose.Node.Id == n.options.Id {
continue continue
} }
log.Debugf("Network received close message from: %s", pbNetClose.Node.Id) log.Debugf("Network received close message from: %s", pbNetClose.Node.Id)
peer := &node{ peer := &node{
id: pbNetClose.Node.Id, id: pbNetClose.Node.Id,
address: pbNetClose.Node.Address, address: pbNetClose.Node.Address,
} }
if err := n.DeletePeerNode(peer.id); err != nil { if err := n.DeletePeerNode(peer.id); err != nil {
log.Debugf("Network failed to delete node %s routes: %v", peer.id, err) log.Debugf("Network failed to delete node %s routes: %v", peer.id, err)
} }
if err := n.prunePeerRoutes(peer); err != nil { if err := n.prunePeerRoutes(peer); err != nil {
log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err)
} }
// deelete peer from the peerLinks
// delete peer from the peerLinks
n.Lock() n.Lock()
delete(n.peerLinks, pbNetClose.Node.Address) delete(n.peerLinks, pbNetClose.Node.Address)
n.Unlock() n.Unlock()
@ -497,57 +556,6 @@ func (n *network) processNetChan(listener tunnel.Listener) {
} }
} }
// sendMsg sends a message to the tunnel channel
func (n *network) sendMsg(method string, msg proto.Message, channel string) error {
body, err := proto.Marshal(msg)
if err != nil {
return err
}
// create transport message and chuck it down the pipe
m := transport.Message{
Header: map[string]string{
"Micro-Method": method,
},
Body: body,
}
// check if the channel client is initialized
n.RLock()
client, ok := n.tunClient[channel]
if !ok || client == nil {
n.RUnlock()
return ErrClientNotFound
}
n.RUnlock()
log.Debugf("Network sending %s message from: %s", method, n.options.Id)
if err := client.Send(&m); err != nil {
return err
}
return nil
}
// announce announces node peers to the network
func (n *network) announce(client transport.Client) {
announce := time.NewTicker(AnnounceTime)
defer announce.Stop()
for {
select {
case <-n.closed:
return
case <-announce.C:
msg := PeersToProto(n.node, MaxDepth)
// advertise yourself to the network
if err := n.sendMsg("peer", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to advertise peers: %v", err)
continue
}
}
}
}
// pruneRoutes prunes routes return by given query // pruneRoutes prunes routes return by given query
func (n *network) pruneRoutes(q ...router.QueryOption) error { func (n *network) pruneRoutes(q ...router.QueryOption) error {
routes, err := n.router.Table().Query(q...) routes, err := n.router.Table().Query(q...)
@ -585,9 +593,12 @@ func (n *network) prunePeerRoutes(peer *node) error {
return nil return nil
} }
// prune deltes node peers that have not been seen for longer than PruneTime seconds // manage the process of announcing to peers and prune any peer nodes that have not been
// prune also removes all the routes either originated by or routable by the stale nodes // seen for a period of time. Also removes all the routes either originated by or routable
func (n *network) prune() { //by the stale nodes
func (n *network) manage() {
announce := time.NewTicker(AnnounceTime)
defer announce.Stop()
prune := time.NewTicker(PruneTime) prune := time.NewTicker(PruneTime)
defer prune.Stop() defer prune.Stop()
@ -595,8 +606,14 @@ func (n *network) prune() {
select { select {
case <-n.closed: case <-n.closed:
return return
case <-announce.C:
msg := PeersToProto(n.node, MaxDepth)
// advertise yourself to the network
if err := n.sendMsg("peer", NetworkChannel, msg); err != nil {
log.Debugf("Network failed to advertise peers: %v", err)
}
case <-prune.C: case <-prune.C:
pruned := n.PruneStalePeerNodes(PruneTime) pruned := n.PruneStalePeers(PruneTime)
for id, peer := range pruned { for id, peer := range pruned {
log.Debugf("Network peer exceeded prune time: %s", id) log.Debugf("Network peer exceeded prune time: %s", id)
n.Lock() n.Lock()
@ -606,29 +623,90 @@ func (n *network) prune() {
log.Debugf("Network failed pruning peer %s routes: %v", id, err) log.Debugf("Network failed pruning peer %s routes: %v", id, err)
} }
} }
// get a list of all routes // get a list of all routes
routes, err := n.options.Router.Table().List() routes, err := n.options.Router.Table().List()
if err != nil { if err != nil {
log.Debugf("Network failed listing routes when pruning peers: %v", err) log.Debugf("Network failed listing routes when pruning peers: %v", err)
continue continue
} }
// collect all the router IDs in the routing table // collect all the router IDs in the routing table
routers := make(map[string]bool) routers := make(map[string]bool)
for _, route := range routes { for _, route := range routes {
if _, ok := routers[route.Router]; !ok { // check if its been processed
routers[route.Router] = true if _, ok := routers[route.Router]; ok {
// if the router is NOT in our peer graph, delete all routes originated by it continue
if peerNode := n.node.GetPeerNode(route.Router); peerNode == nil { }
if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil {
log.Debugf("Network failed deleting routes by %s: %v", route.Router, err) // mark as processed
} routers[route.Router] = true
}
// if the router is NOT in our peer graph, delete all routes originated by it
if peer := n.node.GetPeerNode(route.Router); peer != nil {
continue
}
if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil {
log.Debugf("Network failed deleting routes by %s: %v", route.Router, err)
} }
} }
} }
} }
} }
// sendTo sends a message to a specific node as a one off.
// we need this because when links die, we have no discovery info,
// and sending to an existing multicast link doesn't immediately work
func (n *network) sendTo(method, channel, peer string, msg proto.Message) error {
body, err := proto.Marshal(msg)
if err != nil {
return err
}
c, err := n.tunnel.Dial(channel, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
defer c.Close()
log.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, peer)
return c.Send(&transport.Message{
Header: map[string]string{
"Micro-Method": method,
"Micro-Peer": peer,
},
Body: body,
})
}
// sendMsg sends a message to the tunnel channel
func (n *network) sendMsg(method, channel string, msg proto.Message) error {
body, err := proto.Marshal(msg)
if err != nil {
return err
}
// check if the channel client is initialized
n.RLock()
client, ok := n.tunClient[channel]
if !ok || client == nil {
n.RUnlock()
return ErrClientNotFound
}
n.RUnlock()
log.Debugf("Network sending %s message from: %s", method, n.options.Id)
return client.Send(&transport.Message{
Header: map[string]string{
"Micro-Method": method,
},
Body: body,
})
}
// handleCtrlConn handles ControlChannel connections // handleCtrlConn handles ControlChannel connections
func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) {
for { for {
@ -642,6 +720,14 @@ func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) {
continue continue
} }
// check if peer is set
peer := m.Header["Micro-Peer"]
// check who the message is intended for
if len(peer) > 0 && peer != n.options.Id {
continue
}
select { select {
case msg <- &message{ case msg <- &message{
msg: m, msg: m,
@ -770,15 +856,19 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
switch m.msg.Header["Micro-Method"] { switch m.msg.Header["Micro-Method"] {
case "advert": case "advert":
pbRtrAdvert := &pbRtr.Advert{} pbRtrAdvert := &pbRtr.Advert{}
if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil { if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil {
log.Debugf("Network fail to unmarshal advert message: %v", err) log.Debugf("Network fail to unmarshal advert message: %v", err)
continue continue
} }
// don't process your own messages // don't process your own messages
if pbRtrAdvert.Id == n.options.Id { if pbRtrAdvert.Id == n.options.Id {
continue continue
} }
log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id) log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id)
// loookup advertising node in our peer topology // loookup advertising node in our peer topology
advertNode := n.node.GetPeerNode(pbRtrAdvert.Id) advertNode := n.node.GetPeerNode(pbRtrAdvert.Id)
if advertNode == nil { if advertNode == nil {
@ -788,6 +878,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
} }
var events []*router.Event var events []*router.Event
for _, event := range pbRtrAdvert.Events { for _, event := range pbRtrAdvert.Events {
// we know the advertising node is not the origin of the route // we know the advertising node is not the origin of the route
if pbRtrAdvert.Id != event.Route.Router { if pbRtrAdvert.Id != event.Route.Router {
@ -798,6 +889,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
continue continue
} }
} }
route := router.Route{ route := router.Route{
Service: event.Route.Service, Service: event.Route.Service,
Address: event.Route.Address, Address: event.Route.Address,
@ -807,6 +899,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
Link: event.Route.Link, Link: event.Route.Link,
Metric: event.Route.Metric, Metric: event.Route.Metric,
} }
// calculate route metric and add to the advertised metric // calculate route metric and add to the advertised metric
// we need to make sure we do not overflow math.MaxInt64 // we need to make sure we do not overflow math.MaxInt64
metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link) metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link)
@ -829,11 +922,13 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
} }
events = append(events, e) events = append(events, e)
} }
// if no events are eligible for processing continue // if no events are eligible for processing continue
if len(events) == 0 { if len(events) == 0 {
log.Tracef("Network no events to be processed by router: %s", n.options.Id) log.Tracef("Network no events to be processed by router: %s", n.options.Id)
continue continue
} }
// create an advert and process it // create an advert and process it
advert := &router.Advert{ advert := &router.Advert{
Id: pbRtrAdvert.Id, Id: pbRtrAdvert.Id,
@ -853,16 +948,27 @@ func (n *network) processCtrlChan(listener tunnel.Listener) {
log.Debugf("Network fail to unmarshal solicit message: %v", err) log.Debugf("Network fail to unmarshal solicit message: %v", err)
continue continue
} }
log.Debugf("Network received solicit message from: %s", pbRtrSolicit.Id) log.Debugf("Network received solicit message from: %s", pbRtrSolicit.Id)
// ignore solicitation when requested by you // ignore solicitation when requested by you
if pbRtrSolicit.Id == n.options.Id { if pbRtrSolicit.Id == n.options.Id {
continue continue
} }
log.Debugf("Network router flushing routes for: %s", pbRtrSolicit.Id) log.Debugf("Network router flushing routes for: %s", pbRtrSolicit.Id)
// advertise all the routes when a new node has connected // advertise all the routes when a new node has connected
if err := n.router.Solicit(); err != nil { if err := n.router.Solicit(); err != nil {
log.Debugf("Network failed to solicit routes: %s", err) log.Debugf("Network failed to solicit routes: %s", err)
} }
// specify that someone solicited the route
select {
case n.solicited <- pbRtrSolicit.Id:
default:
// don't block
}
} }
case <-n.closed: case <-n.closed:
return return
@ -879,6 +985,7 @@ func (n *network) advertise(advertChan <-chan *router.Advert) {
case advert := <-advertChan: case advert := <-advertChan:
// create a proto advert // create a proto advert
var events []*pbRtr.Event var events []*pbRtr.Event
for _, event := range advert.Events { for _, event := range advert.Events {
// the routes service address // the routes service address
address := event.Route.Address address := event.Route.Address
@ -912,16 +1019,33 @@ func (n *network) advertise(advertChan <-chan *router.Advert) {
} }
events = append(events, e) events = append(events, e)
} }
msg := &pbRtr.Advert{ msg := &pbRtr.Advert{
Id: advert.Id, Id: advert.Id,
Type: pbRtr.AdvertType(advert.Type), Type: pbRtr.AdvertType(advert.Type),
Timestamp: advert.Timestamp.UnixNano(), Timestamp: advert.Timestamp.UnixNano(),
Events: events, Events: events,
} }
if err := n.sendMsg("advert", msg, ControlChannel); err != nil {
log.Debugf("Network failed to advertise routes: %v", err) // send the advert to all on the control channel
// since its not a solicitation
if advert.Type != router.Solicitation {
if err := n.sendMsg("advert", ControlChannel, msg); err != nil {
log.Debugf("Network failed to advertise routes: %v", err)
}
continue continue
} }
// it's a solication, someone asked for it
// so we're going to pick off the node and send it
select {
case node := <-n.solicited:
// someone requested the route
n.sendTo("advert", ControlChannel, node, msg)
default:
// send to all since we can't get anything
n.sendMsg("advert", ControlChannel, msg)
}
case <-n.closed: case <-n.closed:
return return
} }
@ -939,41 +1063,87 @@ func (n *network) sendConnect() {
Address: n.node.address, Address: n.node.address,
}, },
} }
if err := n.sendMsg("connect", msg, NetworkChannel); err != nil {
if err := n.sendMsg("connect", NetworkChannel, msg); err != nil {
log.Debugf("Network failed to send connect message: %s", err) log.Debugf("Network failed to send connect message: %s", err)
} }
} }
// connect will wait for a link to be established // connect will wait for a link to be established and send the connect
// message. We're trying to ensure convergence pretty quickly. So we want
// to hear back. In the case we become completely disconnected we'll
// connect again once a new link is established
func (n *network) connect() { func (n *network) connect() {
// wait for connected state // discovered lets us know what we received a peer message back
var connected bool var discovered bool
var attempts int
// our advertise address
loopback := n.server.Options().Advertise
// actual address
address := n.tunnel.Address()
for { for {
// check the links // connected is used to define if the link is connected
var connected bool
// check the links state
for _, link := range n.tunnel.Links() { for _, link := range n.tunnel.Links() {
// skip loopback
if link.Loopback() {
continue
}
// if remote is ourselves
switch link.Remote() {
case loopback, address:
continue
}
if link.State() == "connected" { if link.State() == "connected" {
connected = true connected = true
break break
} }
} }
// if we're not conencted wait // if we're not connected wait
if !connected { if !connected {
// reset discovered
discovered = false
// sleep for a second
time.Sleep(time.Second) time.Sleep(time.Second)
// now try again
continue continue
} }
// send the connect message // we're connected but are we discovered?
n.sendConnect() if !discovered {
// recreate the clients because all the tunnel links are gone
// so we haven't send discovery beneath
if err := n.createClients(); err != nil {
log.Debugf("Failed to recreate network/control clients: %v", err)
continue
}
// check the announce channel // send the connect message
n.sendConnect()
}
// check if we've been discovered
select { select {
case <-n.announced: case <-n.discovered:
discovered = true
attempts = 0
case <-n.closed:
return return
default: case <-time.After(time.Second + backoff.Do(attempts)):
time.Sleep(time.Second) // we have to try again
// we have to go again attempts++
// reset attempts 5 == ~2mins
if attempts > 5 {
attempts = 0
}
} }
} }
} }
@ -982,18 +1152,18 @@ func (n *network) connect() {
func (n *network) Connect() error { func (n *network) Connect() error {
n.Lock() n.Lock()
// try to resolve network nodes
nodes, err := n.resolveNodes()
if err != nil {
log.Debugf("Network failed to resolve nodes: %v", err)
}
// connect network tunnel // connect network tunnel
if err := n.tunnel.Connect(); err != nil { if err := n.tunnel.Connect(); err != nil {
n.Unlock() n.Unlock()
return err return err
} }
// try to resolve network nodes
nodes, err := n.resolveNodes()
if err != nil {
log.Debugf("Network failed to resolve nodes: %v", err)
}
// initialize the tunnel to resolved nodes // initialize the tunnel to resolved nodes
n.tunnel.Init( n.tunnel.Init(
tunnel.Nodes(nodes...), tunnel.Nodes(nodes...),
@ -1048,8 +1218,6 @@ func (n *network) Connect() error {
// create closed channel // create closed channel
n.closed = make(chan bool) n.closed = make(chan bool)
// create new announced channel
n.announced = make(chan bool)
// start the router // start the router
if err := n.options.Router.Start(); err != nil { if err := n.options.Router.Start(); err != nil {
@ -1073,16 +1241,14 @@ func (n *network) Connect() error {
// send connect after there's a link established // send connect after there's a link established
go n.connect() go n.connect()
// go resolving network nodes // resolve network nodes and re-init the tunnel
go n.resolve() go n.resolve()
// broadcast peers // broadcast announcements and prune stale nodes
go n.announce(netClient) go n.manage()
// prune stale nodes
go n.prune()
// listen to network messages
go n.processNetChan(netListener)
// advertise service routes // advertise service routes
go n.advertise(advertChan) go n.advertise(advertChan)
// listen to network messages
go n.processNetChan(netListener)
// accept and process routes // accept and process routes
go n.processCtrlChan(ctrlListener) go n.processCtrlChan(ctrlListener)
@ -1112,6 +1278,40 @@ func (n *network) close() error {
return nil return nil
} }
// createClients is used to create new clients in the event we lose all the tunnels
func (n *network) createClients() error {
// dial into ControlChannel to send route adverts
ctrlClient, err := n.tunnel.Dial(ControlChannel, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
// dial into NetworkChannel to send network messages
netClient, err := n.tunnel.Dial(NetworkChannel, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
n.Lock()
defer n.Unlock()
// set the control client
c, ok := n.tunClient[ControlChannel]
if ok {
c.Close()
}
n.tunClient[ControlChannel] = ctrlClient
// set the network client
c, ok = n.tunClient[NetworkChannel]
if ok {
c.Close()
}
n.tunClient[NetworkChannel] = netClient
return nil
}
// Close closes network connection // Close closes network connection
func (n *network) Close() error { func (n *network) Close() error {
n.Lock() n.Lock()
@ -1140,7 +1340,7 @@ func (n *network) Close() error {
Address: n.node.address, Address: n.node.address,
}, },
} }
if err := n.sendMsg("close", msg, NetworkChannel); err != nil { if err := n.sendMsg("close", NetworkChannel, msg); err != nil {
log.Debugf("Network failed to send close message: %s", err) log.Debugf("Network failed to send close message: %s", err)
} }
} }

View File

@ -56,14 +56,6 @@ type Network interface {
Server() server.Server Server() server.Server
} }
// message is network message
type message struct {
// msg is transport message
msg *transport.Message
// session is tunnel session
session tunnel.Session
}
// NewNetwork returns a new network interface // NewNetwork returns a new network interface
func NewNetwork(opts ...Option) Network { func NewNetwork(opts ...Option) Network {
return newNetwork(opts...) return newNetwork(opts...)

View File

@ -216,7 +216,7 @@ func (n *node) DeletePeerNode(id string) error {
// PruneStalePeerNodes prune the peers that have not been seen for longer than given time // PruneStalePeerNodes prune the peers that have not been seen for longer than given time
// It returns a map of the the nodes that got pruned // It returns a map of the the nodes that got pruned
func (n *node) PruneStalePeerNodes(pruneTime time.Duration) map[string]*node { func (n *node) PruneStalePeers(pruneTime time.Duration) map[string]*node {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()

View File

@ -225,7 +225,7 @@ func TestPruneStalePeerNodes(t *testing.T) {
time.Sleep(pruneTime) time.Sleep(pruneTime)
// should delete all nodes besides node // should delete all nodes besides node
pruned := node.PruneStalePeerNodes(pruneTime) pruned := node.PruneStalePeers(pruneTime)
if len(pruned) != len(nodes)-1 { if len(pruned) != len(nodes)-1 {
t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-1, len(pruned)) t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-1, len(pruned))

View File

@ -83,7 +83,8 @@ func readLoop(r server.Request, s client.Stream) error {
// toNodes returns a list of node addresses from given routes // toNodes returns a list of node addresses from given routes
func toNodes(routes []router.Route) []string { func toNodes(routes []router.Route) []string {
nodes := make([]string, len(routes)) nodes := make([]string, 0, len(routes))
for _, node := range routes { for _, node := range routes {
address := node.Address address := node.Address
if len(node.Gateway) > 0 { if len(node.Gateway) > 0 {
@ -91,11 +92,13 @@ func toNodes(routes []router.Route) []string {
} }
nodes = append(nodes, address) nodes = append(nodes, address)
} }
return nodes return nodes
} }
func toSlice(r map[uint64]router.Route) []router.Route { func toSlice(r map[uint64]router.Route) []router.Route {
routes := make([]router.Route, 0, len(r)) routes := make([]router.Route, 0, len(r))
for _, v := range r { for _, v := range r {
routes = append(routes, v) routes = append(routes, v)
} }
@ -161,6 +164,8 @@ func (p *Proxy) filterRoutes(ctx context.Context, routes []router.Route) []route
filteredRoutes = append(filteredRoutes, route) filteredRoutes = append(filteredRoutes, route)
} }
log.Tracef("Proxy filtered routes %+v vs %+v\n", routes, filteredRoutes)
return filteredRoutes return filteredRoutes
} }
@ -225,13 +230,15 @@ func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) {
// refreshMetrics will refresh any metrics for our local cached routes. // refreshMetrics will refresh any metrics for our local cached routes.
// we may not receive new watch events for these as they change. // we may not receive new watch events for these as they change.
func (p *Proxy) refreshMetrics() { func (p *Proxy) refreshMetrics() {
services := make([]string, 0, len(p.Routes))
// get a list of services to update // get a list of services to update
p.RLock() p.RLock()
services := make([]string, 0, len(p.Routes))
for service := range p.Routes { for service := range p.Routes {
services = append(services, service) services = append(services, service)
} }
p.RUnlock() p.RUnlock()
// get and cache the routes for the service // get and cache the routes for the service
@ -246,6 +253,8 @@ func (p *Proxy) manageRoutes(route router.Route, action string) error {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
log.Tracef("Proxy taking route action %v %+v\n", action, route)
switch action { switch action {
case "create", "update": case "create", "update":
if _, ok := p.Routes[route.Service]; !ok { if _, ok := p.Routes[route.Service]; !ok {
@ -253,7 +262,12 @@ func (p *Proxy) manageRoutes(route router.Route, action string) error {
} }
p.Routes[route.Service][route.Hash()] = route p.Routes[route.Service][route.Hash()] = route
case "delete": case "delete":
// delete that specific route
delete(p.Routes[route.Service], route.Hash()) delete(p.Routes[route.Service], route.Hash())
// clean up the cache entirely
if len(p.Routes[route.Service]) == 0 {
delete(p.Routes, route.Service)
}
default: default:
return fmt.Errorf("unknown action: %s", action) return fmt.Errorf("unknown action: %s", action)
} }
@ -288,7 +302,7 @@ func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error {
// TODO: check that we're not broadcast storming by sending to the same topic // TODO: check that we're not broadcast storming by sending to the same topic
// that we're actually subscribed to // that we're actually subscribed to
log.Tracef("Received message for %s", msg.Topic()) log.Tracef("Proxy received message for %s", msg.Topic())
var errors []string var errors []string
@ -329,7 +343,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
return errors.BadRequest("go.micro.proxy", "service name is blank") return errors.BadRequest("go.micro.proxy", "service name is blank")
} }
log.Tracef("Received request for %s", service) log.Tracef("Proxy received request for %s", service)
// are we network routing or local routing // are we network routing or local routing
if len(p.Links) == 0 { if len(p.Links) == 0 {
@ -363,15 +377,17 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
} }
//nolint:prealloc //nolint:prealloc
var opts []client.CallOption opts := []client.CallOption{
// set strategy to round robin
// set strategy to round robin client.WithSelectOption(selector.WithStrategy(selector.RoundRobin)),
opts = append(opts, client.WithSelectOption(selector.WithStrategy(selector.RoundRobin))) }
// if the address is already set just serve it // if the address is already set just serve it
// TODO: figure it out if we should know to pick a link // TODO: figure it out if we should know to pick a link
if len(addresses) > 0 { if len(addresses) > 0 {
opts = append(opts, client.WithAddress(addresses...)) opts = append(opts,
client.WithAddress(addresses...),
)
// serve the normal way // serve the normal way
return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...) return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...)
@ -387,10 +403,16 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
opts = append(opts, client.WithAddress(addresses...)) opts = append(opts, client.WithAddress(addresses...))
} }
log.Tracef("Proxy calling %+v\n", addresses)
// serve the normal way // serve the normal way
return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...) return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...)
} }
// we're assuming we need routes to operate on
if len(routes) == 0 {
return errors.InternalServerError("go.micro.proxy", "route not found")
}
var gerr error var gerr error
// we're routing globally with multiple links // we're routing globally with multiple links
@ -404,11 +426,16 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
continue continue
} }
log.Debugf("Proxy using route %+v\n", route) log.Tracef("Proxy using route %+v\n", route)
// set the address to call // set the address to call
addresses := toNodes([]router.Route{route}) addresses := toNodes([]router.Route{route})
opts = append(opts, client.WithAddress(addresses...)) // set the address in the options
// disable retries since its one route processing
opts = append(opts,
client.WithAddress(addresses...),
client.WithRetries(0),
)
// do the request with the link // do the request with the link
gerr = p.serveRequest(ctx, link, service, endpoint, req, rsp, opts...) gerr = p.serveRequest(ctx, link, service, endpoint, req, rsp, opts...)
@ -558,7 +585,9 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
}() }()
go func() { go func() {
t := time.NewTicker(time.Minute) // TODO: speed up refreshing of metrics
// without this ticking effort e.g stream
t := time.NewTicker(time.Second * 10)
defer t.Stop() defer t.Stop()
// we must refresh route metrics since they do not trigger new events // we must refresh route metrics since they do not trigger new events

View File

@ -799,7 +799,8 @@ func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) {
// build a list of events to advertise // build a list of events to advertise
events := make([]*Event, len(bestRoutes)) events := make([]*Event, len(bestRoutes))
i := 0 var i int
for _, route := range bestRoutes { for _, route := range bestRoutes {
event := &Event{ event := &Event{
Type: evType, Type: evType,
@ -823,9 +824,10 @@ func (r *router) Solicit() error {
// advertise the routes // advertise the routes
r.advertWg.Add(1) r.advertWg.Add(1)
go func() { go func() {
defer r.advertWg.Done() r.publishAdvert(Solicitation, events)
r.publishAdvert(RouteUpdate, events) r.advertWg.Done()
}() }()
return nil return nil

View File

@ -111,6 +111,8 @@ const (
Announce AdvertType = iota Announce AdvertType = iota
// RouteUpdate advertises route updates // RouteUpdate advertises route updates
RouteUpdate RouteUpdate
// Solicitation indicates routes were solicited
Solicitation
) )
// String returns human readable advertisement type // String returns human readable advertisement type
@ -120,6 +122,8 @@ func (t AdvertType) String() string {
return "announce" return "announce"
case RouteUpdate: case RouteUpdate:
return "update" return "update"
case Solicitation:
return "solicitation"
default: default:
return "unknown" return "unknown"
} }

View File

@ -86,24 +86,18 @@ func getHeader(hdr string, md map[string]string) string {
} }
func getHeaders(m *codec.Message) { func getHeaders(m *codec.Message) {
get := func(hdr, v string) string { set := func(v, hdr string) string {
if len(v) > 0 { if len(v) > 0 {
return v return v
} }
return m.Header[hdr]
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
} }
m.Id = get("Micro-Id", m.Id) m.Id = set(m.Id, "Micro-Id")
m.Error = get("Micro-Error", m.Error) m.Error = set(m.Error, "Micro-Error")
m.Endpoint = get("Micro-Endpoint", m.Endpoint) m.Endpoint = set(m.Endpoint, "Micro-Endpoint")
m.Method = get("Micro-Method", m.Method) m.Method = set(m.Method, "Micro-Method")
m.Target = get("Micro-Service", m.Target) m.Target = set(m.Target, "Micro-Service")
// TODO: remove this cruft // TODO: remove this cruft
if len(m.Endpoint) == 0 { if len(m.Endpoint) == 0 {
@ -321,7 +315,6 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// write an error if it failed // write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error() m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error
m.Header["Micro-Error"] = m.Error m.Header["Micro-Error"] = m.Error
// no body to write // no body to write
if err := c.codec.Write(m, nil); err != nil { if err := c.codec.Write(m, nil); err != nil {

View File

@ -549,6 +549,7 @@ func (s *rpcServer) Register() error {
node.Metadata["protocol"] = "mucp" node.Metadata["protocol"] = "mucp"
s.RLock() s.RLock()
// Maps are ordered randomly, sort the keys for consistency // Maps are ordered randomly, sort the keys for consistency
var handlerList []string var handlerList []string
for n, e := range s.handlers { for n, e := range s.handlers {
@ -557,6 +558,7 @@ func (s *rpcServer) Register() error {
handlerList = append(handlerList, n) handlerList = append(handlerList, n)
} }
} }
sort.Strings(handlerList) sort.Strings(handlerList)
var subscriberList []Subscriber var subscriberList []Subscriber
@ -566,18 +568,20 @@ func (s *rpcServer) Register() error {
subscriberList = append(subscriberList, e) subscriberList = append(subscriberList, e)
} }
} }
sort.Slice(subscriberList, func(i, j int) bool { sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].Topic() > subscriberList[j].Topic() return subscriberList[i].Topic() > subscriberList[j].Topic()
}) })
endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, n := range handlerList { for _, n := range handlerList {
endpoints = append(endpoints, s.handlers[n].Endpoints()...) endpoints = append(endpoints, s.handlers[n].Endpoints()...)
} }
for _, e := range subscriberList { for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...) endpoints = append(endpoints, e.Endpoints()...)
} }
s.RUnlock()
service := &registry.Service{ service := &registry.Service{
Name: config.Name, Name: config.Name,
@ -586,9 +590,10 @@ func (s *rpcServer) Register() error {
Endpoints: endpoints, Endpoints: endpoints,
} }
s.Lock() // get registered value
registered := s.registered registered := s.registered
s.Unlock()
s.RUnlock()
if !registered { if !registered {
log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
@ -610,6 +615,8 @@ func (s *rpcServer) Register() error {
defer s.Unlock() defer s.Unlock()
s.registered = true s.registered = true
// set what we're advertising
s.opts.Advertise = addr
// subscribe to the topic with own name // subscribe to the topic with own name
sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent) sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent)

View File

@ -9,9 +9,9 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/cmd" "github.com/micro/go-micro/config/cmd"
"github.com/micro/go-micro/debug/service/handler"
"github.com/micro/go-micro/debug/profile" "github.com/micro/go-micro/debug/profile"
"github.com/micro/go-micro/debug/profile/pprof" "github.com/micro/go-micro/debug/profile/pprof"
"github.com/micro/go-micro/debug/service/handler"
"github.com/micro/go-micro/plugin" "github.com/micro/go-micro/plugin"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"

View File

@ -327,6 +327,7 @@ func (t *tun) process() {
// and the message is being sent outbound via // and the message is being sent outbound via
// a dialled connection don't use this link // a dialled connection don't use this link
if loopback && msg.outbound { if loopback && msg.outbound {
log.Tracef("Link for node %s is loopback", node)
err = errors.New("link is loopback") err = errors.New("link is loopback")
continue continue
} }
@ -334,6 +335,7 @@ func (t *tun) process() {
// if the message was being returned by the loopback listener // if the message was being returned by the loopback listener
// send it back up the loopback link only // send it back up the loopback link only
if msg.loopback && !loopback { if msg.loopback && !loopback {
log.Tracef("Link for message %s is loopback", node)
err = errors.New("link is not loopback") err = errors.New("link is not loopback")
continue continue
} }
@ -363,7 +365,7 @@ func (t *tun) process() {
// send the message // send the message
for _, link := range sendTo { for _, link := range sendTo {
// send the message via the current link // send the message via the current link
log.Tracef("Sending %+v to %s", newMsg.Header, link.Remote()) log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote())
if errr := link.Send(newMsg); errr != nil { if errr := link.Send(newMsg); errr != nil {
log.Debugf("Tunnel error sending %+v to %s: %v", newMsg.Header, link.Remote(), errr) log.Debugf("Tunnel error sending %+v to %s: %v", newMsg.Header, link.Remote(), errr)
@ -502,6 +504,7 @@ func (t *tun) listen(link *link) {
// nothing more to do // nothing more to do
continue continue
case "close": case "close":
log.Debugf("Tunnel link %s received close message", link.Remote())
// if there is no channel then we close the link // if there is no channel then we close the link
// as its a signal from the other side to close the connection // as its a signal from the other side to close the connection
if len(channel) == 0 { if len(channel) == 0 {
@ -555,9 +558,11 @@ func (t *tun) listen(link *link) {
// a continued session // a continued session
case "session": case "session":
// process message // process message
log.Tracef("Received %+v from %s", msg.Header, link.Remote()) log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
// an announcement of a channel listener // an announcement of a channel listener
case "announce": case "announce":
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
// process the announcement // process the announcement
channels := strings.Split(channel, ",") channels := strings.Split(channel, ",")
@ -629,7 +634,7 @@ func (t *tun) listen(link *link) {
s, exists = t.getSession(channel, "listener") s, exists = t.getSession(channel, "listener")
// only return accept to the session // only return accept to the session
case mtype == "accept": case mtype == "accept":
log.Debugf("Received accept message for channel: %s session: %s", channel, sessionId) log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId)
s, exists = t.getSession(channel, sessionId) s, exists = t.getSession(channel, sessionId)
if exists && s.accepted { if exists && s.accepted {
continue continue
@ -649,7 +654,7 @@ func (t *tun) listen(link *link) {
// bail if no session or listener has been found // bail if no session or listener has been found
if !exists { if !exists {
log.Debugf("Tunnel skipping no channel: %s session: %s exists", channel, sessionId) log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId)
// drop it, we don't care about // drop it, we don't care about
// messages we don't know about // messages we don't know about
continue continue
@ -665,7 +670,7 @@ func (t *tun) listen(link *link) {
// otherwise process // otherwise process
} }
log.Debugf("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype) log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype)
// construct a new transport message // construct a new transport message
tmsg := &transport.Message{ tmsg := &transport.Message{
@ -740,7 +745,7 @@ func (t *tun) keepalive(link *link) {
"Micro-Tunnel-Id": t.id, "Micro-Tunnel-Id": t.id,
}, },
}); err != nil { }); err != nil {
log.Debugf("Error sending keepalive to link %v: %v", link.Remote(), err) log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
t.delLink(link.Remote()) t.delLink(link.Remote())
return return
} }

View File

@ -2,6 +2,7 @@ package tunnel
import ( import (
"bytes" "bytes"
"errors"
"io" "io"
"sync" "sync"
"time" "time"
@ -14,7 +15,11 @@ import (
type link struct { type link struct {
transport.Socket transport.Socket
// transport to use for connections
transport transport.Transport
sync.RWMutex sync.RWMutex
// stops the link // stops the link
closed chan bool closed chan bool
// link state channel for testing link // link state channel for testing link
@ -65,6 +70,8 @@ var (
linkRequest = []byte{0, 0, 0, 0} linkRequest = []byte{0, 0, 0, 0}
// the 4 byte 1 filled packet sent to determine link state // the 4 byte 1 filled packet sent to determine link state
linkResponse = []byte{1, 1, 1, 1} linkResponse = []byte{1, 1, 1, 1}
ErrLinkConnectTimeout = errors.New("link connect timeout")
) )
func newLink(s transport.Socket) *link { func newLink(s transport.Socket) *link {
@ -72,8 +79,8 @@ func newLink(s transport.Socket) *link {
Socket: s, Socket: s,
id: uuid.New().String(), id: uuid.New().String(),
lastKeepAlive: time.Now(), lastKeepAlive: time.Now(),
channels: make(map[string]time.Time),
closed: make(chan bool), closed: make(chan bool),
channels: make(map[string]time.Time),
state: make(chan *packet, 64), state: make(chan *packet, 64),
sendQueue: make(chan *packet, 128), sendQueue: make(chan *packet, 128),
recvQueue: make(chan *packet, 128), recvQueue: make(chan *packet, 128),
@ -87,6 +94,32 @@ func newLink(s transport.Socket) *link {
return l return l
} }
func (l *link) connect(addr string) error {
c, err := l.transport.Dial(addr)
if err != nil {
return err
}
l.Lock()
l.Socket = c
l.Unlock()
return nil
}
func (l *link) accept(sock transport.Socket) error {
l.Lock()
l.Socket = sock
l.Unlock()
return nil
}
func (l *link) setLoopback(v bool) {
l.Lock()
l.loopback = v
l.Unlock()
}
// setRate sets the bits per second rate as a float64 // setRate sets the bits per second rate as a float64
func (l *link) setRate(bits int64, delta time.Duration) { func (l *link) setRate(bits int64, delta time.Duration) {
// rate of send in bits per nanosecond // rate of send in bits per nanosecond
@ -201,11 +234,15 @@ func (l *link) manage() {
t := time.NewTicker(time.Minute) t := time.NewTicker(time.Minute)
defer t.Stop() defer t.Stop()
// get link id
linkId := l.Id()
// used to send link state packets // used to send link state packets
send := func(b []byte) error { send := func(b []byte) error {
return l.Send(&transport.Message{ return l.Send(&transport.Message{
Header: map[string]string{ Header: map[string]string{
"Micro-Method": "link", "Micro-Method": "link",
"Micro-Link-Id": linkId,
}, Body: b, }, Body: b,
}) })
} }
@ -229,9 +266,7 @@ func (l *link) manage() {
// check the type of message // check the type of message
switch { switch {
case bytes.Equal(p.message.Body, linkRequest): case bytes.Equal(p.message.Body, linkRequest):
l.RLock() log.Tracef("Link %s received link request", linkId)
log.Tracef("Link %s received link request %v", l.id, p.message.Body)
l.RUnlock()
// send response // send response
if err := send(linkResponse); err != nil { if err := send(linkResponse); err != nil {
@ -242,9 +277,7 @@ func (l *link) manage() {
case bytes.Equal(p.message.Body, linkResponse): case bytes.Equal(p.message.Body, linkResponse):
// set round trip time // set round trip time
d := time.Since(now) d := time.Since(now)
l.RLock() log.Tracef("Link %s received link response in %v", linkId, d)
log.Tracef("Link %s received link response in %v", p.message.Body, d)
l.RUnlock()
// set the RTT // set the RTT
l.setRTT(d) l.setRTT(d)
} }
@ -309,6 +342,12 @@ func (l *link) Rate() float64 {
return l.rate return l.rate
} }
func (l *link) Loopback() bool {
l.RLock()
defer l.RUnlock()
return l.loopback
}
// Length returns the roundtrip time as nanoseconds (lower is better). // Length returns the roundtrip time as nanoseconds (lower is better).
// Returns 0 where no measurement has been taken. // Returns 0 where no measurement has been taken.
func (l *link) Length() int64 { func (l *link) Length() int64 {
@ -320,7 +359,6 @@ func (l *link) Length() int64 {
func (l *link) Id() string { func (l *link) Id() string {
l.RLock() l.RLock()
defer l.RUnlock() defer l.RUnlock()
return l.id return l.id
} }

View File

@ -80,7 +80,7 @@ func (t *tunListener) process() {
// get a session // get a session
sess, ok := conns[sessionId] sess, ok := conns[sessionId]
log.Debugf("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, sessionId, m.typ, ok) log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, sessionId, m.typ, ok)
if !ok { if !ok {
// we only process open and session types // we only process open and session types
switch m.typ { switch m.typ {
@ -159,7 +159,7 @@ func (t *tunListener) process() {
case <-sess.closed: case <-sess.closed:
delete(conns, sessionId) delete(conns, sessionId)
case sess.recv <- m: case sess.recv <- m:
log.Debugf("Tunnel listener sent to recv chan channel %s session %s type %s", m.channel, sessionId, m.typ) log.Tracef("Tunnel listener sent to recv chan channel %s session %s type %s", m.channel, sessionId, m.typ)
} }
} }
} }

View File

@ -356,11 +356,11 @@ func (s *session) Recv(m *transport.Message) error {
} }
//log.Tracef("Received %+v from recv backlog", msg) //log.Tracef("Received %+v from recv backlog", msg)
log.Debugf("Received %+v from recv backlog", msg) log.Tracef("Received %+v from recv backlog", msg)
// decrypt the received payload using the token // decrypt the received payload using the token
// we have to used msg.session because multicast has a shared // we have to used msg.session because multicast has a shared
// session id of "multicast" in this session struct on // session id of "multicast" in this session struct on
// the listener side // the listener side
body, err := Decrypt(msg.data.Body, s.token+s.channel+msg.session) body, err := Decrypt(msg.data.Body, s.token+s.channel+msg.session)
if err != nil { if err != nil {

View File

@ -64,7 +64,9 @@ type Link interface {
Length() int64 Length() int64
// Current transfer rate as bits per second (lower is better) // Current transfer rate as bits per second (lower is better)
Rate() float64 Rate() float64
// State of the link e.g connected/closed // Is this a loopback link
Loopback() bool
// State of the link: connected/closed/error
State() string State() string
// honours transport socket // honours transport socket
transport.Socket transport.Socket