diff --git a/client/rpc_client.go b/client/rpc_client.go index 75ce75ac..4da6df45 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -316,6 +316,22 @@ func (r *rpcClient) Options() Options { return r.opts } +// hasProxy checks if we have proxy set in the environment +func (r *rpcClient) hasProxy() bool { + // get proxy + if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { + return true + } + + // get proxy address + if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 { + return true + } + + return false +} + +// next returns an iterator for the next nodes to call func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { service := request.Service() @@ -431,10 +447,18 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return err } - ch := make(chan error, callOpts.Retries+1) + // get the retries + retries := callOpts.Retries + + // disable retries when using a proxy + if r.hasProxy() { + retries = 0 + } + + ch := make(chan error, retries+1) var gerr error - for i := 0; i <= callOpts.Retries; i++ { + for i := 0; i <= retries; i++ { go func(i int) { ch <- call(i) }(i) @@ -514,10 +538,18 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt err error } - ch := make(chan response, callOpts.Retries+1) + // get the retries + retries := callOpts.Retries + + // disable retries when using a proxy + if r.hasProxy() { + retries = 0 + } + + ch := make(chan response, retries+1) var grr error - for i := 0; i <= callOpts.Retries; i++ { + for i := 0; i <= retries; i++ { go func(i int) { s, err := call(i) ch <- response{s, err} diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 60dc02b5..a71f6a11 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -88,32 +88,24 @@ func (rwc *readWriteCloser) Close() error { } func getHeaders(m *codec.Message) { - get := func(hdr string) string { - if hd := m.Header[hdr]; len(hd) > 0 { - return hd + set := func(v, hdr string) string { + if len(v) > 0 { + return v } - // old - return m.Header["X-"+hdr] + return m.Header[hdr] } // check error in header - if len(m.Error) == 0 { - m.Error = get("Micro-Error") - } + m.Error = set(m.Error, "Micro-Error") // check endpoint in header - if len(m.Endpoint) == 0 { - m.Endpoint = get("Micro-Endpoint") - } + m.Endpoint = set(m.Endpoint, "Micro-Endpoint") // check method in header - if len(m.Method) == 0 { - m.Method = get("Micro-Method") - } + m.Method = set(m.Method, "Micro-Method") - if len(m.Id) == 0 { - m.Id = get("Micro-Id") - } + // set the request id + m.Id = set(m.Id, "Micro-Id") } func setHeaders(m *codec.Message, stream string) { @@ -122,7 +114,6 @@ func setHeaders(m *codec.Message, stream string) { return } m.Header[hdr] = v - m.Header["X-"+hdr] = v } set("Micro-Id", m.Id) diff --git a/network/default.go b/network/default.go index 4e5d96d7..26789436 100644 --- a/network/default.go +++ b/network/default.go @@ -71,9 +71,18 @@ type network struct { connected bool // closed closes the network closed chan bool - // whether we've announced the first connect successfully - // and received back some sort of peer message - announced chan bool + // whether we've discovered by the network + discovered chan bool + // solicted checks whether routes were solicited by one node + solicited chan string +} + +// message is network message +type message struct { + // msg is transport message + msg *transport.Message + // session is tunnel session + session tunnel.Session } // newNetwork returns a new network node @@ -149,14 +158,16 @@ func newNetwork(opts ...Option) Network { address: peerAddress, peers: make(map[string]*node), }, - options: options, - router: options.Router, - proxy: options.Proxy, - tunnel: options.Tunnel, - server: server, - client: client, - tunClient: make(map[string]transport.Client), - peerLinks: make(map[string]tunnel.Link), + options: options, + router: options.Router, + proxy: options.Proxy, + tunnel: options.Tunnel, + server: server, + client: client, + tunClient: make(map[string]transport.Client), + peerLinks: make(map[string]tunnel.Link), + discovered: make(chan bool, 1), + solicited: make(chan string, 1), } network.node.network = network @@ -278,6 +289,14 @@ func (n *network) handleNetConn(s tunnel.Session, msg chan *message) { continue } + // check if peer is set + peer := m.Header["Micro-Peer"] + + // check who the message is intended for + if len(peer) > 0 && peer != n.options.Id { + continue + } + select { case msg <- &message{ msg: m, @@ -367,26 +386,33 @@ func (n *network) processNetChan(listener tunnel.Listener) { // mark the time the message has been received now := time.Now() pbNetConnect := &pbNet.Connect{} + if err := proto.Unmarshal(m.msg.Body, pbNetConnect); err != nil { log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) continue } + // don't process your own messages if pbNetConnect.Node.Id == n.options.Id { continue } + log.Debugf("Network received connect message from: %s", pbNetConnect.Node.Id) + peer := &node{ id: pbNetConnect.Node.Id, address: pbNetConnect.Node.Address, peers: make(map[string]*node), lastSeen: now, } + // update peer links log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetConnect.Node.Address) + if err := n.updatePeerLinks(pbNetConnect.Node.Address, m.session.Link()); err != nil { log.Debugf("Network failed updating peer links: %s", err) } + // add peer to the list of node peers if err := n.node.AddPeer(peer); err == ErrPeerExists { log.Debugf("Network peer exists, refreshing: %s", peer.id) @@ -394,50 +420,76 @@ func (n *network) processNetChan(listener tunnel.Listener) { if err := n.RefreshPeer(peer.id, now); err != nil { log.Debugf("Network failed refreshing peer %s: %v", peer.id, err) } - continue } + + // we send the peer message because someone has sent connect + // and wants to know what's on the network. The faster we + // respond the faster we start to converge + // get node peers down to MaxDepth encoded in protobuf msg := PeersToProto(n.node, MaxDepth) + node := pbNetConnect.Node.Id + // advertise yourself to the network - if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { + if err := n.sendTo("peer", NetworkChannel, node, msg); err != nil { log.Debugf("Network failed to advertise peers: %v", err) } + // advertise all the routes when a new node has connected if err := n.router.Solicit(); err != nil { log.Debugf("Network failed to solicit routes: %s", err) } + + // specify that we're soliciting + select { + case n.solicited <- node: + default: + // don't block + } case "peer": // mark the time the message has been received now := time.Now() pbNetPeer := &pbNet.Peer{} + if err := proto.Unmarshal(m.msg.Body, pbNetPeer); err != nil { log.Debugf("Network tunnel [%s] peer unmarshal error: %v", NetworkChannel, err) continue } + // don't process your own messages if pbNetPeer.Node.Id == n.options.Id { continue } + log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address) + peer := &node{ id: pbNetPeer.Node.Id, address: pbNetPeer.Node.Address, peers: make(map[string]*node), lastSeen: now, } + // update peer links log.Tracef("Network updating peer link %s for peer: %s", m.session.Link(), pbNetPeer.Node.Address) + if err := n.updatePeerLinks(pbNetPeer.Node.Address, m.session.Link()); err != nil { log.Debugf("Network failed updating peer links: %s", err) } + if err := n.node.AddPeer(peer); err == nil { // send a solicit message when discovering new peer msg := &pbRtr.Solicit{ Id: n.options.Id, } - if err := n.sendMsg("solicit", msg, ControlChannel); err != nil { + + node := pbNetPeer.Node.Id + + // only solicit this peer + if err := n.sendTo("solicit", ControlChannel, node, msg); err != nil { log.Debugf("Network failed to send solicit message: %s", err) } + continue // we're expecting any error to be ErrPeerExists } else if err != ErrPeerExists { @@ -446,6 +498,7 @@ func (n *network) processNetChan(listener tunnel.Listener) { } log.Debugf("Network peer exists, refreshing: %s", pbNetPeer.Node.Id) + // update lastSeen time for the peer if err := n.RefreshPeer(pbNetPeer.Node.Id, now); err != nil { log.Debugf("Network failed refreshing peer %s: %v", pbNetPeer.Node.Id, err) @@ -458,12 +511,12 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network failed to update peers: %v", err) } - // check if we announced and were discovered + // tell the connect loop that we've been discovered + // so it stops sending connect messages out select { - case <-n.announced: - // we've sent the connect and received this response + case n.discovered <- true: default: - close(n.announced) + // don't block here } case "close": pbNetClose := &pbNet.Close{} @@ -471,22 +524,28 @@ func (n *network) processNetChan(listener tunnel.Listener) { log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) continue } + // don't process your own messages if pbNetClose.Node.Id == n.options.Id { continue } + log.Debugf("Network received close message from: %s", pbNetClose.Node.Id) + peer := &node{ id: pbNetClose.Node.Id, address: pbNetClose.Node.Address, } + if err := n.DeletePeerNode(peer.id); err != nil { log.Debugf("Network failed to delete node %s routes: %v", peer.id, err) } + if err := n.prunePeerRoutes(peer); err != nil { log.Debugf("Network failed pruning peer %s routes: %v", peer.id, err) } - // deelete peer from the peerLinks + + // delete peer from the peerLinks n.Lock() delete(n.peerLinks, pbNetClose.Node.Address) n.Unlock() @@ -497,57 +556,6 @@ func (n *network) processNetChan(listener tunnel.Listener) { } } -// sendMsg sends a message to the tunnel channel -func (n *network) sendMsg(method string, msg proto.Message, channel string) error { - body, err := proto.Marshal(msg) - if err != nil { - return err - } - // create transport message and chuck it down the pipe - m := transport.Message{ - Header: map[string]string{ - "Micro-Method": method, - }, - Body: body, - } - - // check if the channel client is initialized - n.RLock() - client, ok := n.tunClient[channel] - if !ok || client == nil { - n.RUnlock() - return ErrClientNotFound - } - n.RUnlock() - - log.Debugf("Network sending %s message from: %s", method, n.options.Id) - if err := client.Send(&m); err != nil { - return err - } - - return nil -} - -// announce announces node peers to the network -func (n *network) announce(client transport.Client) { - announce := time.NewTicker(AnnounceTime) - defer announce.Stop() - - for { - select { - case <-n.closed: - return - case <-announce.C: - msg := PeersToProto(n.node, MaxDepth) - // advertise yourself to the network - if err := n.sendMsg("peer", msg, NetworkChannel); err != nil { - log.Debugf("Network failed to advertise peers: %v", err) - continue - } - } - } -} - // pruneRoutes prunes routes return by given query func (n *network) pruneRoutes(q ...router.QueryOption) error { routes, err := n.router.Table().Query(q...) @@ -585,9 +593,12 @@ func (n *network) prunePeerRoutes(peer *node) error { return nil } -// prune deltes node peers that have not been seen for longer than PruneTime seconds -// prune also removes all the routes either originated by or routable by the stale nodes -func (n *network) prune() { +// manage the process of announcing to peers and prune any peer nodes that have not been +// seen for a period of time. Also removes all the routes either originated by or routable +//by the stale nodes +func (n *network) manage() { + announce := time.NewTicker(AnnounceTime) + defer announce.Stop() prune := time.NewTicker(PruneTime) defer prune.Stop() @@ -595,8 +606,14 @@ func (n *network) prune() { select { case <-n.closed: return + case <-announce.C: + msg := PeersToProto(n.node, MaxDepth) + // advertise yourself to the network + if err := n.sendMsg("peer", NetworkChannel, msg); err != nil { + log.Debugf("Network failed to advertise peers: %v", err) + } case <-prune.C: - pruned := n.PruneStalePeerNodes(PruneTime) + pruned := n.PruneStalePeers(PruneTime) for id, peer := range pruned { log.Debugf("Network peer exceeded prune time: %s", id) n.Lock() @@ -606,29 +623,90 @@ func (n *network) prune() { log.Debugf("Network failed pruning peer %s routes: %v", id, err) } } + // get a list of all routes routes, err := n.options.Router.Table().List() if err != nil { log.Debugf("Network failed listing routes when pruning peers: %v", err) continue } + // collect all the router IDs in the routing table routers := make(map[string]bool) + for _, route := range routes { - if _, ok := routers[route.Router]; !ok { - routers[route.Router] = true - // if the router is NOT in our peer graph, delete all routes originated by it - if peerNode := n.node.GetPeerNode(route.Router); peerNode == nil { - if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil { - log.Debugf("Network failed deleting routes by %s: %v", route.Router, err) - } - } + // check if its been processed + if _, ok := routers[route.Router]; ok { + continue + } + + // mark as processed + routers[route.Router] = true + + // if the router is NOT in our peer graph, delete all routes originated by it + if peer := n.node.GetPeerNode(route.Router); peer != nil { + continue + } + + if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil { + log.Debugf("Network failed deleting routes by %s: %v", route.Router, err) } } } } } +// sendTo sends a message to a specific node as a one off. +// we need this because when links die, we have no discovery info, +// and sending to an existing multicast link doesn't immediately work +func (n *network) sendTo(method, channel, peer string, msg proto.Message) error { + body, err := proto.Marshal(msg) + if err != nil { + return err + } + c, err := n.tunnel.Dial(channel, tunnel.DialMode(tunnel.Multicast)) + if err != nil { + return err + } + defer c.Close() + + log.Debugf("Network sending %s message from: %s to %s", method, n.options.Id, peer) + + return c.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Method": method, + "Micro-Peer": peer, + }, + Body: body, + }) +} + +// sendMsg sends a message to the tunnel channel +func (n *network) sendMsg(method, channel string, msg proto.Message) error { + body, err := proto.Marshal(msg) + if err != nil { + return err + } + + // check if the channel client is initialized + n.RLock() + client, ok := n.tunClient[channel] + if !ok || client == nil { + n.RUnlock() + return ErrClientNotFound + } + n.RUnlock() + + log.Debugf("Network sending %s message from: %s", method, n.options.Id) + + return client.Send(&transport.Message{ + Header: map[string]string{ + "Micro-Method": method, + }, + Body: body, + }) +} + // handleCtrlConn handles ControlChannel connections func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { for { @@ -642,6 +720,14 @@ func (n *network) handleCtrlConn(s tunnel.Session, msg chan *message) { continue } + // check if peer is set + peer := m.Header["Micro-Peer"] + + // check who the message is intended for + if len(peer) > 0 && peer != n.options.Id { + continue + } + select { case msg <- &message{ msg: m, @@ -770,15 +856,19 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { switch m.msg.Header["Micro-Method"] { case "advert": pbRtrAdvert := &pbRtr.Advert{} + if err := proto.Unmarshal(m.msg.Body, pbRtrAdvert); err != nil { log.Debugf("Network fail to unmarshal advert message: %v", err) continue } + // don't process your own messages if pbRtrAdvert.Id == n.options.Id { continue } + log.Debugf("Network received advert message from: %s", pbRtrAdvert.Id) + // loookup advertising node in our peer topology advertNode := n.node.GetPeerNode(pbRtrAdvert.Id) if advertNode == nil { @@ -788,6 +878,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { } var events []*router.Event + for _, event := range pbRtrAdvert.Events { // we know the advertising node is not the origin of the route if pbRtrAdvert.Id != event.Route.Router { @@ -798,6 +889,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { continue } } + route := router.Route{ Service: event.Route.Service, Address: event.Route.Address, @@ -807,6 +899,7 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { Link: event.Route.Link, Metric: event.Route.Metric, } + // calculate route metric and add to the advertised metric // we need to make sure we do not overflow math.MaxInt64 metric := n.getRouteMetric(event.Route.Router, event.Route.Gateway, event.Route.Link) @@ -829,11 +922,13 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { } events = append(events, e) } + // if no events are eligible for processing continue if len(events) == 0 { log.Tracef("Network no events to be processed by router: %s", n.options.Id) continue } + // create an advert and process it advert := &router.Advert{ Id: pbRtrAdvert.Id, @@ -853,16 +948,27 @@ func (n *network) processCtrlChan(listener tunnel.Listener) { log.Debugf("Network fail to unmarshal solicit message: %v", err) continue } + log.Debugf("Network received solicit message from: %s", pbRtrSolicit.Id) + // ignore solicitation when requested by you if pbRtrSolicit.Id == n.options.Id { continue } + log.Debugf("Network router flushing routes for: %s", pbRtrSolicit.Id) + // advertise all the routes when a new node has connected if err := n.router.Solicit(); err != nil { log.Debugf("Network failed to solicit routes: %s", err) } + + // specify that someone solicited the route + select { + case n.solicited <- pbRtrSolicit.Id: + default: + // don't block + } } case <-n.closed: return @@ -879,6 +985,7 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { case advert := <-advertChan: // create a proto advert var events []*pbRtr.Event + for _, event := range advert.Events { // the routes service address address := event.Route.Address @@ -912,16 +1019,33 @@ func (n *network) advertise(advertChan <-chan *router.Advert) { } events = append(events, e) } + msg := &pbRtr.Advert{ Id: advert.Id, Type: pbRtr.AdvertType(advert.Type), Timestamp: advert.Timestamp.UnixNano(), Events: events, } - if err := n.sendMsg("advert", msg, ControlChannel); err != nil { - log.Debugf("Network failed to advertise routes: %v", err) + + // send the advert to all on the control channel + // since its not a solicitation + if advert.Type != router.Solicitation { + if err := n.sendMsg("advert", ControlChannel, msg); err != nil { + log.Debugf("Network failed to advertise routes: %v", err) + } continue } + + // it's a solication, someone asked for it + // so we're going to pick off the node and send it + select { + case node := <-n.solicited: + // someone requested the route + n.sendTo("advert", ControlChannel, node, msg) + default: + // send to all since we can't get anything + n.sendMsg("advert", ControlChannel, msg) + } case <-n.closed: return } @@ -939,41 +1063,87 @@ func (n *network) sendConnect() { Address: n.node.address, }, } - if err := n.sendMsg("connect", msg, NetworkChannel); err != nil { + + if err := n.sendMsg("connect", NetworkChannel, msg); err != nil { log.Debugf("Network failed to send connect message: %s", err) } } -// connect will wait for a link to be established +// connect will wait for a link to be established and send the connect +// message. We're trying to ensure convergence pretty quickly. So we want +// to hear back. In the case we become completely disconnected we'll +// connect again once a new link is established func (n *network) connect() { - // wait for connected state - var connected bool + // discovered lets us know what we received a peer message back + var discovered bool + var attempts int + + // our advertise address + loopback := n.server.Options().Advertise + // actual address + address := n.tunnel.Address() for { - // check the links + // connected is used to define if the link is connected + var connected bool + + // check the links state for _, link := range n.tunnel.Links() { + // skip loopback + if link.Loopback() { + continue + } + + // if remote is ourselves + switch link.Remote() { + case loopback, address: + continue + } + if link.State() == "connected" { connected = true break } } - // if we're not conencted wait + // if we're not connected wait if !connected { + // reset discovered + discovered = false + // sleep for a second time.Sleep(time.Second) + // now try again continue } - // send the connect message - n.sendConnect() + // we're connected but are we discovered? + if !discovered { + // recreate the clients because all the tunnel links are gone + // so we haven't send discovery beneath + if err := n.createClients(); err != nil { + log.Debugf("Failed to recreate network/control clients: %v", err) + continue + } - // check the announce channel + // send the connect message + n.sendConnect() + } + + // check if we've been discovered select { - case <-n.announced: + case <-n.discovered: + discovered = true + attempts = 0 + case <-n.closed: return - default: - time.Sleep(time.Second) - // we have to go again + case <-time.After(time.Second + backoff.Do(attempts)): + // we have to try again + attempts++ + + // reset attempts 5 == ~2mins + if attempts > 5 { + attempts = 0 + } } } } @@ -982,18 +1152,18 @@ func (n *network) connect() { func (n *network) Connect() error { n.Lock() - // try to resolve network nodes - nodes, err := n.resolveNodes() - if err != nil { - log.Debugf("Network failed to resolve nodes: %v", err) - } - // connect network tunnel if err := n.tunnel.Connect(); err != nil { n.Unlock() return err } + // try to resolve network nodes + nodes, err := n.resolveNodes() + if err != nil { + log.Debugf("Network failed to resolve nodes: %v", err) + } + // initialize the tunnel to resolved nodes n.tunnel.Init( tunnel.Nodes(nodes...), @@ -1048,8 +1218,6 @@ func (n *network) Connect() error { // create closed channel n.closed = make(chan bool) - // create new announced channel - n.announced = make(chan bool) // start the router if err := n.options.Router.Start(); err != nil { @@ -1073,16 +1241,14 @@ func (n *network) Connect() error { // send connect after there's a link established go n.connect() - // go resolving network nodes + // resolve network nodes and re-init the tunnel go n.resolve() - // broadcast peers - go n.announce(netClient) - // prune stale nodes - go n.prune() - // listen to network messages - go n.processNetChan(netListener) + // broadcast announcements and prune stale nodes + go n.manage() // advertise service routes go n.advertise(advertChan) + // listen to network messages + go n.processNetChan(netListener) // accept and process routes go n.processCtrlChan(ctrlListener) @@ -1112,6 +1278,40 @@ func (n *network) close() error { return nil } +// createClients is used to create new clients in the event we lose all the tunnels +func (n *network) createClients() error { + // dial into ControlChannel to send route adverts + ctrlClient, err := n.tunnel.Dial(ControlChannel, tunnel.DialMode(tunnel.Multicast)) + if err != nil { + return err + } + + // dial into NetworkChannel to send network messages + netClient, err := n.tunnel.Dial(NetworkChannel, tunnel.DialMode(tunnel.Multicast)) + if err != nil { + return err + } + + n.Lock() + defer n.Unlock() + + // set the control client + c, ok := n.tunClient[ControlChannel] + if ok { + c.Close() + } + n.tunClient[ControlChannel] = ctrlClient + + // set the network client + c, ok = n.tunClient[NetworkChannel] + if ok { + c.Close() + } + n.tunClient[NetworkChannel] = netClient + + return nil +} + // Close closes network connection func (n *network) Close() error { n.Lock() @@ -1140,7 +1340,7 @@ func (n *network) Close() error { Address: n.node.address, }, } - if err := n.sendMsg("close", msg, NetworkChannel); err != nil { + if err := n.sendMsg("close", NetworkChannel, msg); err != nil { log.Debugf("Network failed to send close message: %s", err) } } diff --git a/network/network.go b/network/network.go index 08a45665..8f71709c 100644 --- a/network/network.go +++ b/network/network.go @@ -56,14 +56,6 @@ type Network interface { Server() server.Server } -// message is network message -type message struct { - // msg is transport message - msg *transport.Message - // session is tunnel session - session tunnel.Session -} - // NewNetwork returns a new network interface func NewNetwork(opts ...Option) Network { return newNetwork(opts...) diff --git a/network/node.go b/network/node.go index f0d3809c..f1a51b7d 100644 --- a/network/node.go +++ b/network/node.go @@ -216,7 +216,7 @@ func (n *node) DeletePeerNode(id string) error { // PruneStalePeerNodes prune the peers that have not been seen for longer than given time // It returns a map of the the nodes that got pruned -func (n *node) PruneStalePeerNodes(pruneTime time.Duration) map[string]*node { +func (n *node) PruneStalePeers(pruneTime time.Duration) map[string]*node { n.Lock() defer n.Unlock() diff --git a/network/node_test.go b/network/node_test.go index 80a41c70..71b21a19 100644 --- a/network/node_test.go +++ b/network/node_test.go @@ -225,7 +225,7 @@ func TestPruneStalePeerNodes(t *testing.T) { time.Sleep(pruneTime) // should delete all nodes besides node - pruned := node.PruneStalePeerNodes(pruneTime) + pruned := node.PruneStalePeers(pruneTime) if len(pruned) != len(nodes)-1 { t.Errorf("Expected pruned node count: %d, got: %d", len(nodes)-1, len(pruned)) diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index 0e04c7d6..a7cf5bd6 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -83,7 +83,8 @@ func readLoop(r server.Request, s client.Stream) error { // toNodes returns a list of node addresses from given routes func toNodes(routes []router.Route) []string { - nodes := make([]string, len(routes)) + nodes := make([]string, 0, len(routes)) + for _, node := range routes { address := node.Address if len(node.Gateway) > 0 { @@ -91,11 +92,13 @@ func toNodes(routes []router.Route) []string { } nodes = append(nodes, address) } + return nodes } func toSlice(r map[uint64]router.Route) []router.Route { routes := make([]router.Route, 0, len(r)) + for _, v := range r { routes = append(routes, v) } @@ -161,6 +164,8 @@ func (p *Proxy) filterRoutes(ctx context.Context, routes []router.Route) []route filteredRoutes = append(filteredRoutes, route) } + log.Tracef("Proxy filtered routes %+v vs %+v\n", routes, filteredRoutes) + return filteredRoutes } @@ -225,13 +230,15 @@ func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) { // refreshMetrics will refresh any metrics for our local cached routes. // we may not receive new watch events for these as they change. func (p *Proxy) refreshMetrics() { - services := make([]string, 0, len(p.Routes)) - // get a list of services to update p.RLock() + + services := make([]string, 0, len(p.Routes)) + for service := range p.Routes { services = append(services, service) } + p.RUnlock() // get and cache the routes for the service @@ -246,6 +253,8 @@ func (p *Proxy) manageRoutes(route router.Route, action string) error { p.Lock() defer p.Unlock() + log.Tracef("Proxy taking route action %v %+v\n", action, route) + switch action { case "create", "update": if _, ok := p.Routes[route.Service]; !ok { @@ -253,7 +262,12 @@ func (p *Proxy) manageRoutes(route router.Route, action string) error { } p.Routes[route.Service][route.Hash()] = route case "delete": + // delete that specific route delete(p.Routes[route.Service], route.Hash()) + // clean up the cache entirely + if len(p.Routes[route.Service]) == 0 { + delete(p.Routes, route.Service) + } default: return fmt.Errorf("unknown action: %s", action) } @@ -288,7 +302,7 @@ func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error { // TODO: check that we're not broadcast storming by sending to the same topic // that we're actually subscribed to - log.Tracef("Received message for %s", msg.Topic()) + log.Tracef("Proxy received message for %s", msg.Topic()) var errors []string @@ -329,7 +343,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server return errors.BadRequest("go.micro.proxy", "service name is blank") } - log.Tracef("Received request for %s", service) + log.Tracef("Proxy received request for %s", service) // are we network routing or local routing if len(p.Links) == 0 { @@ -363,15 +377,17 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server } //nolint:prealloc - var opts []client.CallOption - - // set strategy to round robin - opts = append(opts, client.WithSelectOption(selector.WithStrategy(selector.RoundRobin))) + opts := []client.CallOption{ + // set strategy to round robin + client.WithSelectOption(selector.WithStrategy(selector.RoundRobin)), + } // if the address is already set just serve it // TODO: figure it out if we should know to pick a link if len(addresses) > 0 { - opts = append(opts, client.WithAddress(addresses...)) + opts = append(opts, + client.WithAddress(addresses...), + ) // serve the normal way return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...) @@ -387,10 +403,16 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server opts = append(opts, client.WithAddress(addresses...)) } + log.Tracef("Proxy calling %+v\n", addresses) // serve the normal way return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...) } + // we're assuming we need routes to operate on + if len(routes) == 0 { + return errors.InternalServerError("go.micro.proxy", "route not found") + } + var gerr error // we're routing globally with multiple links @@ -404,11 +426,16 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server continue } - log.Debugf("Proxy using route %+v\n", route) + log.Tracef("Proxy using route %+v\n", route) // set the address to call addresses := toNodes([]router.Route{route}) - opts = append(opts, client.WithAddress(addresses...)) + // set the address in the options + // disable retries since its one route processing + opts = append(opts, + client.WithAddress(addresses...), + client.WithRetries(0), + ) // do the request with the link gerr = p.serveRequest(ctx, link, service, endpoint, req, rsp, opts...) @@ -558,7 +585,9 @@ func NewProxy(opts ...options.Option) proxy.Proxy { }() go func() { - t := time.NewTicker(time.Minute) + // TODO: speed up refreshing of metrics + // without this ticking effort e.g stream + t := time.NewTicker(time.Second * 10) defer t.Stop() // we must refresh route metrics since they do not trigger new events diff --git a/router/default.go b/router/default.go index b0739ebf..02ceac1b 100644 --- a/router/default.go +++ b/router/default.go @@ -799,7 +799,8 @@ func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) { // build a list of events to advertise events := make([]*Event, len(bestRoutes)) - i := 0 + var i int + for _, route := range bestRoutes { event := &Event{ Type: evType, @@ -823,9 +824,10 @@ func (r *router) Solicit() error { // advertise the routes r.advertWg.Add(1) + go func() { - defer r.advertWg.Done() - r.publishAdvert(RouteUpdate, events) + r.publishAdvert(Solicitation, events) + r.advertWg.Done() }() return nil diff --git a/router/router.go b/router/router.go index e6018766..7758817c 100644 --- a/router/router.go +++ b/router/router.go @@ -111,6 +111,8 @@ const ( Announce AdvertType = iota // RouteUpdate advertises route updates RouteUpdate + // Solicitation indicates routes were solicited + Solicitation ) // String returns human readable advertisement type @@ -120,6 +122,8 @@ func (t AdvertType) String() string { return "announce" case RouteUpdate: return "update" + case Solicitation: + return "solicitation" default: return "unknown" } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 342110aa..53d552f4 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -86,24 +86,18 @@ func getHeader(hdr string, md map[string]string) string { } func getHeaders(m *codec.Message) { - get := func(hdr, v string) string { + set := func(v, hdr string) string { if len(v) > 0 { return v } - - if hd := m.Header[hdr]; len(hd) > 0 { - return hd - } - - // old - return m.Header["X-"+hdr] + return m.Header[hdr] } - m.Id = get("Micro-Id", m.Id) - m.Error = get("Micro-Error", m.Error) - m.Endpoint = get("Micro-Endpoint", m.Endpoint) - m.Method = get("Micro-Method", m.Method) - m.Target = get("Micro-Service", m.Target) + m.Id = set(m.Id, "Micro-Id") + m.Error = set(m.Error, "Micro-Error") + m.Endpoint = set(m.Endpoint, "Micro-Endpoint") + m.Method = set(m.Method, "Micro-Method") + m.Target = set(m.Target, "Micro-Service") // TODO: remove this cruft if len(m.Endpoint) == 0 { @@ -321,7 +315,6 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { // write an error if it failed m.Error = errors.Wrapf(err, "Unable to encode body").Error() - m.Header["X-Micro-Error"] = m.Error m.Header["Micro-Error"] = m.Error // no body to write if err := c.codec.Write(m, nil); err != nil { diff --git a/server/rpc_server.go b/server/rpc_server.go index 191bf1e8..d23401c8 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -549,6 +549,7 @@ func (s *rpcServer) Register() error { node.Metadata["protocol"] = "mucp" s.RLock() + // Maps are ordered randomly, sort the keys for consistency var handlerList []string for n, e := range s.handlers { @@ -557,6 +558,7 @@ func (s *rpcServer) Register() error { handlerList = append(handlerList, n) } } + sort.Strings(handlerList) var subscriberList []Subscriber @@ -566,18 +568,20 @@ func (s *rpcServer) Register() error { subscriberList = append(subscriberList, e) } } + sort.Slice(subscriberList, func(i, j int) bool { return subscriberList[i].Topic() > subscriberList[j].Topic() }) endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) + for _, n := range handlerList { endpoints = append(endpoints, s.handlers[n].Endpoints()...) } + for _, e := range subscriberList { endpoints = append(endpoints, e.Endpoints()...) } - s.RUnlock() service := ®istry.Service{ Name: config.Name, @@ -586,9 +590,10 @@ func (s *rpcServer) Register() error { Endpoints: endpoints, } - s.Lock() + // get registered value registered := s.registered - s.Unlock() + + s.RUnlock() if !registered { log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) @@ -610,6 +615,8 @@ func (s *rpcServer) Register() error { defer s.Unlock() s.registered = true + // set what we're advertising + s.opts.Advertise = addr // subscribe to the topic with own name sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent) diff --git a/service.go b/service.go index 915e6512..efddd0a4 100644 --- a/service.go +++ b/service.go @@ -9,9 +9,9 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/config/cmd" - "github.com/micro/go-micro/debug/service/handler" "github.com/micro/go-micro/debug/profile" "github.com/micro/go-micro/debug/profile/pprof" + "github.com/micro/go-micro/debug/service/handler" "github.com/micro/go-micro/plugin" "github.com/micro/go-micro/server" "github.com/micro/go-micro/util/log" diff --git a/tunnel/default.go b/tunnel/default.go index 6b1b9151..5943757f 100644 --- a/tunnel/default.go +++ b/tunnel/default.go @@ -327,6 +327,7 @@ func (t *tun) process() { // and the message is being sent outbound via // a dialled connection don't use this link if loopback && msg.outbound { + log.Tracef("Link for node %s is loopback", node) err = errors.New("link is loopback") continue } @@ -334,6 +335,7 @@ func (t *tun) process() { // if the message was being returned by the loopback listener // send it back up the loopback link only if msg.loopback && !loopback { + log.Tracef("Link for message %s is loopback", node) err = errors.New("link is not loopback") continue } @@ -363,7 +365,7 @@ func (t *tun) process() { // send the message for _, link := range sendTo { // send the message via the current link - log.Tracef("Sending %+v to %s", newMsg.Header, link.Remote()) + log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote()) if errr := link.Send(newMsg); errr != nil { log.Debugf("Tunnel error sending %+v to %s: %v", newMsg.Header, link.Remote(), errr) @@ -502,6 +504,7 @@ func (t *tun) listen(link *link) { // nothing more to do continue case "close": + log.Debugf("Tunnel link %s received close message", link.Remote()) // if there is no channel then we close the link // as its a signal from the other side to close the connection if len(channel) == 0 { @@ -555,9 +558,11 @@ func (t *tun) listen(link *link) { // a continued session case "session": // process message - log.Tracef("Received %+v from %s", msg.Header, link.Remote()) + log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote()) // an announcement of a channel listener case "announce": + log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote()) + // process the announcement channels := strings.Split(channel, ",") @@ -629,7 +634,7 @@ func (t *tun) listen(link *link) { s, exists = t.getSession(channel, "listener") // only return accept to the session case mtype == "accept": - log.Debugf("Received accept message for channel: %s session: %s", channel, sessionId) + log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId) s, exists = t.getSession(channel, sessionId) if exists && s.accepted { continue @@ -649,7 +654,7 @@ func (t *tun) listen(link *link) { // bail if no session or listener has been found if !exists { - log.Debugf("Tunnel skipping no channel: %s session: %s exists", channel, sessionId) + log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId) // drop it, we don't care about // messages we don't know about continue @@ -665,7 +670,7 @@ func (t *tun) listen(link *link) { // otherwise process } - log.Debugf("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype) + log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype) // construct a new transport message tmsg := &transport.Message{ @@ -740,7 +745,7 @@ func (t *tun) keepalive(link *link) { "Micro-Tunnel-Id": t.id, }, }); err != nil { - log.Debugf("Error sending keepalive to link %v: %v", link.Remote(), err) + log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err) t.delLink(link.Remote()) return } diff --git a/tunnel/link.go b/tunnel/link.go index 042f1cb9..a7367812 100644 --- a/tunnel/link.go +++ b/tunnel/link.go @@ -2,6 +2,7 @@ package tunnel import ( "bytes" + "errors" "io" "sync" "time" @@ -14,7 +15,11 @@ import ( type link struct { transport.Socket + // transport to use for connections + transport transport.Transport + sync.RWMutex + // stops the link closed chan bool // link state channel for testing link @@ -65,6 +70,8 @@ var ( linkRequest = []byte{0, 0, 0, 0} // the 4 byte 1 filled packet sent to determine link state linkResponse = []byte{1, 1, 1, 1} + + ErrLinkConnectTimeout = errors.New("link connect timeout") ) func newLink(s transport.Socket) *link { @@ -72,8 +79,8 @@ func newLink(s transport.Socket) *link { Socket: s, id: uuid.New().String(), lastKeepAlive: time.Now(), - channels: make(map[string]time.Time), closed: make(chan bool), + channels: make(map[string]time.Time), state: make(chan *packet, 64), sendQueue: make(chan *packet, 128), recvQueue: make(chan *packet, 128), @@ -87,6 +94,32 @@ func newLink(s transport.Socket) *link { return l } +func (l *link) connect(addr string) error { + c, err := l.transport.Dial(addr) + if err != nil { + return err + } + + l.Lock() + l.Socket = c + l.Unlock() + + return nil +} + +func (l *link) accept(sock transport.Socket) error { + l.Lock() + l.Socket = sock + l.Unlock() + return nil +} + +func (l *link) setLoopback(v bool) { + l.Lock() + l.loopback = v + l.Unlock() +} + // setRate sets the bits per second rate as a float64 func (l *link) setRate(bits int64, delta time.Duration) { // rate of send in bits per nanosecond @@ -201,11 +234,15 @@ func (l *link) manage() { t := time.NewTicker(time.Minute) defer t.Stop() + // get link id + linkId := l.Id() + // used to send link state packets send := func(b []byte) error { return l.Send(&transport.Message{ Header: map[string]string{ - "Micro-Method": "link", + "Micro-Method": "link", + "Micro-Link-Id": linkId, }, Body: b, }) } @@ -229,9 +266,7 @@ func (l *link) manage() { // check the type of message switch { case bytes.Equal(p.message.Body, linkRequest): - l.RLock() - log.Tracef("Link %s received link request %v", l.id, p.message.Body) - l.RUnlock() + log.Tracef("Link %s received link request", linkId) // send response if err := send(linkResponse); err != nil { @@ -242,9 +277,7 @@ func (l *link) manage() { case bytes.Equal(p.message.Body, linkResponse): // set round trip time d := time.Since(now) - l.RLock() - log.Tracef("Link %s received link response in %v", p.message.Body, d) - l.RUnlock() + log.Tracef("Link %s received link response in %v", linkId, d) // set the RTT l.setRTT(d) } @@ -309,6 +342,12 @@ func (l *link) Rate() float64 { return l.rate } +func (l *link) Loopback() bool { + l.RLock() + defer l.RUnlock() + return l.loopback +} + // Length returns the roundtrip time as nanoseconds (lower is better). // Returns 0 where no measurement has been taken. func (l *link) Length() int64 { @@ -320,7 +359,6 @@ func (l *link) Length() int64 { func (l *link) Id() string { l.RLock() defer l.RUnlock() - return l.id } diff --git a/tunnel/listener.go b/tunnel/listener.go index ec435b03..288b7dac 100644 --- a/tunnel/listener.go +++ b/tunnel/listener.go @@ -80,7 +80,7 @@ func (t *tunListener) process() { // get a session sess, ok := conns[sessionId] - log.Debugf("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, sessionId, m.typ, ok) + log.Tracef("Tunnel listener received channel %s session %s type %s exists: %t", m.channel, sessionId, m.typ, ok) if !ok { // we only process open and session types switch m.typ { @@ -159,7 +159,7 @@ func (t *tunListener) process() { case <-sess.closed: delete(conns, sessionId) case sess.recv <- m: - log.Debugf("Tunnel listener sent to recv chan channel %s session %s type %s", m.channel, sessionId, m.typ) + log.Tracef("Tunnel listener sent to recv chan channel %s session %s type %s", m.channel, sessionId, m.typ) } } } diff --git a/tunnel/session.go b/tunnel/session.go index 3cb37c61..e545d316 100644 --- a/tunnel/session.go +++ b/tunnel/session.go @@ -356,11 +356,11 @@ func (s *session) Recv(m *transport.Message) error { } //log.Tracef("Received %+v from recv backlog", msg) - log.Debugf("Received %+v from recv backlog", msg) + log.Tracef("Received %+v from recv backlog", msg) // decrypt the received payload using the token // we have to used msg.session because multicast has a shared - // session id of "multicast" in this session struct on + // session id of "multicast" in this session struct on // the listener side body, err := Decrypt(msg.data.Body, s.token+s.channel+msg.session) if err != nil { diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 56928d8d..abd61cb4 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -64,7 +64,9 @@ type Link interface { Length() int64 // Current transfer rate as bits per second (lower is better) Rate() float64 - // State of the link e.g connected/closed + // Is this a loopback link + Loopback() bool + // State of the link: connected/closed/error State() string // honours transport socket transport.Socket