Support reconnects

This commit is contained in:
Asim Aslam 2019-10-13 18:36:22 +01:00
parent d6c6e7815e
commit f77df51f60
4 changed files with 68 additions and 39 deletions

View File

@ -175,9 +175,6 @@ func (n *network) Name() string {
func (n *network) resolveNodes() ([]string, error) {
// resolve the network address to network nodes
records, err := n.options.Resolver.Resolve(n.options.Name)
if err != nil {
return nil, err
}
nodeMap := make(map[string]bool)
@ -209,6 +206,7 @@ func (n *network) resolveNodes() ([]string, error) {
// resolve anything that looks like a host name
records, err := dns.Resolve(node)
if err != nil {
log.Debugf("Failed to resolve %v %v", node, err)
continue
}
@ -220,7 +218,7 @@ func (n *network) resolveNodes() ([]string, error) {
}
}
return nodes, nil
return nodes, err
}
// resolve continuously resolves network nodes and initializes network tunnel with resolved addresses
@ -347,7 +345,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
if pbNetPeer.Node.Id == n.options.Id {
continue
}
log.Debugf("Network received peer message from: %s", pbNetPeer.Node.Id)
log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address)
peer := &node{
id: pbNetPeer.Node.Id,
address: pbNetPeer.Node.Address,
@ -771,14 +769,25 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A
}
}
func (n *network) sendConnect() {
// send connect message to NetworkChannel
// NOTE: in theory we could do this as soon as
// Dial to NetworkChannel succeeds, but instead
// we initialize all other node resources first
msg := &pbNet.Connect{
Node: &pbNet.Node{
Id: n.node.id,
Address: n.node.address,
},
}
if err := n.sendMsg("connect", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to send connect message: %s", err)
}
}
// Connect connects the network
func (n *network) Connect() error {
n.Lock()
// return if already connected
if n.connected {
n.Unlock()
return nil
}
// try to resolve network nodes
nodes, err := n.resolveNodes()
@ -797,6 +806,15 @@ func (n *network) Connect() error {
return err
}
// return if already connected
if n.connected {
// unlock first
n.Unlock()
// send the connect message
n.sendConnect()
return nil
}
// set our internal node address
// if advertise address is not set
if len(n.options.Advertise) == 0 {
@ -858,19 +876,8 @@ func (n *network) Connect() error {
}
n.Unlock()
// send connect message to NetworkChannel
// NOTE: in theory we could do this as soon as
// Dial to NetworkChannel succeeds, but instead
// we initialize all other node resources first
msg := &pbNet.Connect{
Node: &pbNet.Node{
Id: n.node.id,
Address: n.node.address,
},
}
if err := n.sendMsg("connect", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to send connect message: %s", err)
}
// send the connect message
n.sendConnect()
// go resolving network nodes
go n.resolve()

View File

@ -18,6 +18,10 @@ func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
port = "8085"
}
if len(host) == 0 {
host = "localhost"
}
addrs, err := net.LookupHost(host)
if err != nil {
return nil, err

View File

@ -9,6 +9,7 @@ import (
pbNet "github.com/micro/go-micro/network/proto"
"github.com/micro/go-micro/router"
pbRtr "github.com/micro/go-micro/router/proto"
"github.com/micro/go-micro/util/log"
)
// Network implements network handler
@ -78,11 +79,16 @@ func (n *Network) Connect(ctx context.Context, req *pbNet.ConnectRequest, resp *
nodes = append(nodes, node.Address)
}
log.Infof("Network.Connect setting peers: %v", nodes)
// reinitialise the peers
n.Network.Init(
network.Peers(nodes...),
)
// call the connect method
n.Network.Connect()
return nil
}

View File

@ -777,6 +777,30 @@ func (t *tun) setupLink(node string) (*link, error) {
return link, nil
}
func (t *tun) setupLinks() {
for _, node := range t.options.Nodes {
// skip zero length nodes
if len(node) == 0 {
continue
}
// link already exists
if _, ok := t.links[node]; ok {
continue
}
// connect to node and return link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to establish node link to %s: %v", node, err)
continue
}
// save the link
t.links[node] = link
}
}
// connect the tunnel to all the nodes and listen for incoming tunnel connections
func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address)
@ -816,22 +840,8 @@ func (t *tun) connect() error {
}
}()
for _, node := range t.options.Nodes {
// skip zero length nodes
if len(node) == 0 {
continue
}
// connect to node and return link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to establish node link to %s: %v", node, err)
continue
}
// save the link
t.links[node] = link
}
// setup links
t.setupLinks()
// process outbound messages to be sent
// process sends to all links
@ -850,6 +860,8 @@ func (t *tun) Connect() error {
// already connected
if t.connected {
// setup links
t.setupLinks()
return nil
}