Merge pull request #706 from milosgajdos83/neighbour-map
Broadcast neighbourhood
This commit is contained in:
		| @@ -7,9 +7,10 @@ import ( | |||||||
| 	"github.com/golang/protobuf/proto" | 	"github.com/golang/protobuf/proto" | ||||||
| 	"github.com/micro/go-micro/client" | 	"github.com/micro/go-micro/client" | ||||||
| 	rtr "github.com/micro/go-micro/client/selector/router" | 	rtr "github.com/micro/go-micro/client/selector/router" | ||||||
|  | 	pbNet "github.com/micro/go-micro/network/proto" | ||||||
| 	"github.com/micro/go-micro/proxy" | 	"github.com/micro/go-micro/proxy" | ||||||
| 	"github.com/micro/go-micro/router" | 	"github.com/micro/go-micro/router" | ||||||
| 	pb "github.com/micro/go-micro/router/proto" | 	pbRtr "github.com/micro/go-micro/router/proto" | ||||||
| 	"github.com/micro/go-micro/server" | 	"github.com/micro/go-micro/server" | ||||||
| 	"github.com/micro/go-micro/transport" | 	"github.com/micro/go-micro/transport" | ||||||
| 	"github.com/micro/go-micro/tunnel" | 	"github.com/micro/go-micro/tunnel" | ||||||
| @@ -18,12 +19,24 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	// ControlChannel is the name of the tunnel channel for passing contron message | 	// NetworkChannel is the name of the tunnel channel for passing network messages | ||||||
|  | 	NetworkChannel = "network" | ||||||
|  | 	// ControlChannel is the name of the tunnel channel for passing control message | ||||||
| 	ControlChannel = "control" | 	ControlChannel = "control" | ||||||
| 	// DefaultLink is default network link | 	// DefaultLink is default network link | ||||||
| 	DefaultLink = "network" | 	DefaultLink = "network" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // node is network node | ||||||
|  | type node struct { | ||||||
|  | 	// id is node id | ||||||
|  | 	id string | ||||||
|  | 	// address is node address | ||||||
|  | 	address string | ||||||
|  | 	// neighbours are node neightbours | ||||||
|  | 	neighbours map[string]*node | ||||||
|  | } | ||||||
|  |  | ||||||
| // network implements Network interface | // network implements Network interface | ||||||
| type network struct { | type network struct { | ||||||
| 	// options configure the network | 	// options configure the network | ||||||
| @@ -39,11 +52,16 @@ type network struct { | |||||||
| 	// client is network client | 	// client is network client | ||||||
| 	client client.Client | 	client client.Client | ||||||
|  |  | ||||||
|  | 	// tunClient is a mao of tunnel clients keyed over channel names | ||||||
|  | 	tunClient map[string]transport.Client | ||||||
|  |  | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	// connected marks the network as connected | 	// connected marks the network as connected | ||||||
| 	connected bool | 	connected bool | ||||||
| 	// closed closes the network | 	// closed closes the network | ||||||
| 	closed chan bool | 	closed chan bool | ||||||
|  | 	// neighbours maps the node neighbourhood | ||||||
|  | 	neighbours map[string]*node | ||||||
| } | } | ||||||
|  |  | ||||||
| // newNetwork returns a new network node | // newNetwork returns a new network node | ||||||
| @@ -94,6 +112,8 @@ func newNetwork(opts ...Option) Network { | |||||||
| 		Tunnel:     options.Tunnel, | 		Tunnel:     options.Tunnel, | ||||||
| 		server:     server, | 		server:     server, | ||||||
| 		client:     client, | 		client:     client, | ||||||
|  | 		tunClient:  make(map[string]transport.Client), | ||||||
|  | 		neighbours: make(map[string]*node), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -107,6 +127,7 @@ func (n *network) Address() string { | |||||||
| 	return n.Tunnel.Address() | 	return n.Tunnel.Address() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // resolveNodes resolves network nodes to addresses | ||||||
| func (n *network) resolveNodes() ([]string, error) { | func (n *network) resolveNodes() ([]string, error) { | ||||||
| 	// resolve the network address to network nodes | 	// resolve the network address to network nodes | ||||||
| 	records, err := n.options.Resolver.Resolve(n.options.Name) | 	records, err := n.options.Resolver.Resolve(n.options.Name) | ||||||
| @@ -123,6 +144,7 @@ func (n *network) resolveNodes() ([]string, error) { | |||||||
| 	return nodes, nil | 	return nodes, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // resolve continuously resolves network nodes and initializes network tunnel with resolved addresses | ||||||
| func (n *network) resolve() { | func (n *network) resolve() { | ||||||
| 	resolve := time.NewTicker(ResolveTime) | 	resolve := time.NewTicker(ResolveTime) | ||||||
| 	defer resolve.Stop() | 	defer resolve.Stop() | ||||||
| @@ -145,7 +167,178 @@ func (n *network) resolve() { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (n *network) handleConn(conn tunnel.Conn, msg chan *transport.Message) { | // handleNetConn handles network announcement messages | ||||||
|  | func (n *network) handleNetConn(conn tunnel.Conn, msg chan *transport.Message) { | ||||||
|  | 	for { | ||||||
|  | 		m := new(transport.Message) | ||||||
|  | 		if err := conn.Recv(m); err != nil { | ||||||
|  | 			// TODO: should we bail here? | ||||||
|  | 			log.Debugf("Network tunnel [%s] receive error: %v", NetworkChannel, err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		select { | ||||||
|  | 		case msg <- m: | ||||||
|  | 		case <-n.closed: | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // acceptNetConn accepts connections from NetworkChannel | ||||||
|  | func (n *network) acceptNetConn(l tunnel.Listener, recv chan *transport.Message) { | ||||||
|  | 	for { | ||||||
|  | 		// accept a connection | ||||||
|  | 		conn, err := l.Accept() | ||||||
|  | 		if err != nil { | ||||||
|  | 			// TODO: handle this | ||||||
|  | 			log.Debugf("Network tunnel [%s] accept error: %v", NetworkChannel, err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		select { | ||||||
|  | 		case <-n.closed: | ||||||
|  | 			return | ||||||
|  | 		default: | ||||||
|  | 			// go handle NetworkChannel connection | ||||||
|  | 			go n.handleNetConn(conn, recv) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // processNetChan processes messages received on NetworkChannel | ||||||
|  | func (n *network) processNetChan(l tunnel.Listener) { | ||||||
|  | 	// receive network message queue | ||||||
|  | 	recv := make(chan *transport.Message, 128) | ||||||
|  |  | ||||||
|  | 	// accept NetworkChannel connections | ||||||
|  | 	go n.acceptNetConn(l, recv) | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case m := <-recv: | ||||||
|  | 			// switch on type of message and take action | ||||||
|  | 			switch m.Header["Micro-Method"] { | ||||||
|  | 			case "connect": | ||||||
|  | 				pbNetConnect := &pbNet.Connect{} | ||||||
|  | 				if err := proto.Unmarshal(m.Body, pbNetConnect); err != nil { | ||||||
|  | 					log.Debugf("Network tunnel [%s] connect unmarshal error: %v", NetworkChannel, err) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				// don't process your own messages | ||||||
|  | 				if pbNetConnect.Node.Id == n.options.Id { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				neighbour := &node{ | ||||||
|  | 					id:         pbNetConnect.Node.Id, | ||||||
|  | 					address:    pbNetConnect.Node.Address, | ||||||
|  | 					neighbours: make(map[string]*node), | ||||||
|  | 				} | ||||||
|  | 				n.Lock() | ||||||
|  | 				n.neighbours[neighbour.id] = neighbour | ||||||
|  | 				n.Unlock() | ||||||
|  | 			case "neighbour": | ||||||
|  | 				pbNetNeighbour := &pbNet.Neighbour{} | ||||||
|  | 				if err := proto.Unmarshal(m.Body, pbNetNeighbour); err != nil { | ||||||
|  | 					log.Debugf("Network tunnel [%s] neighbour unmarshal error: %v", NetworkChannel, err) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				// don't process your own messages | ||||||
|  | 				if pbNetNeighbour.Node.Id == n.options.Id { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				neighbour := &node{ | ||||||
|  | 					id:         pbNetNeighbour.Node.Id, | ||||||
|  | 					address:    pbNetNeighbour.Node.Address, | ||||||
|  | 					neighbours: make(map[string]*node), | ||||||
|  | 				} | ||||||
|  | 				n.Lock() | ||||||
|  | 				// we override the existing neighbour map | ||||||
|  | 				n.neighbours[neighbour.id] = neighbour | ||||||
|  | 				// store the neighbouring node and its neighbours | ||||||
|  | 				for _, pbNeighbour := range pbNetNeighbour.Neighbours { | ||||||
|  | 					neighbourNode := &node{ | ||||||
|  | 						id:      pbNeighbour.Id, | ||||||
|  | 						address: pbNeighbour.Address, | ||||||
|  | 					} | ||||||
|  | 					n.neighbours[neighbour.id].neighbours[neighbourNode.id] = neighbourNode | ||||||
|  | 				} | ||||||
|  | 				n.Unlock() | ||||||
|  | 			case "close": | ||||||
|  | 				pbNetClose := &pbNet.Close{} | ||||||
|  | 				if err := proto.Unmarshal(m.Body, pbNetClose); err != nil { | ||||||
|  | 					log.Debugf("Network tunnel [%s] close unmarshal error: %v", NetworkChannel, err) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				// don't process your own messages | ||||||
|  | 				if pbNetClose.Node.Id == n.options.Id { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				n.Lock() | ||||||
|  | 				delete(n.neighbours, pbNetClose.Node.Id) | ||||||
|  | 				n.Unlock() | ||||||
|  | 			} | ||||||
|  | 		case <-n.closed: | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // announce announces node neighbourhood to the network | ||||||
|  | func (n *network) announce(client transport.Client) { | ||||||
|  | 	announce := time.NewTicker(AnnounceTime) | ||||||
|  | 	defer announce.Stop() | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-n.closed: | ||||||
|  | 			return | ||||||
|  | 		case <-announce.C: | ||||||
|  | 			n.RLock() | ||||||
|  | 			nodes := make([]*pbNet.Node, len(n.neighbours)) | ||||||
|  | 			i := 0 | ||||||
|  | 			for id, _ := range n.neighbours { | ||||||
|  | 				pbNode := &pbNet.Node{ | ||||||
|  | 					Id:      id, | ||||||
|  | 					Address: n.neighbours[id].address, | ||||||
|  | 				} | ||||||
|  | 				nodes[i] = pbNode | ||||||
|  | 			} | ||||||
|  | 			n.RUnlock() | ||||||
|  |  | ||||||
|  | 			node := &pbNet.Node{ | ||||||
|  | 				Id:      n.options.Id, | ||||||
|  | 				Address: n.options.Address, | ||||||
|  | 			} | ||||||
|  | 			pbNetNeighbour := &pbNet.Neighbour{ | ||||||
|  | 				Node:       node, | ||||||
|  | 				Neighbours: nodes, | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			body, err := proto.Marshal(pbNetNeighbour) | ||||||
|  | 			if err != nil { | ||||||
|  | 				// TODO: should we bail here? | ||||||
|  | 				log.Debugf("Network failed to marshal neighbour message: %v", err) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 			// create transport message and chuck it down the pipe | ||||||
|  | 			m := transport.Message{ | ||||||
|  | 				Header: map[string]string{ | ||||||
|  | 					"Micro-Method": "neighbour", | ||||||
|  | 				}, | ||||||
|  | 				Body: body, | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if err := client.Send(&m); err != nil { | ||||||
|  | 				log.Debugf("Network failed to send neighbour messsage: %v", err) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // handleCtrlConn handles ControlChannel connections | ||||||
|  | func (n *network) handleCtrlConn(conn tunnel.Conn, msg chan *transport.Message) { | ||||||
| 	for { | 	for { | ||||||
| 		m := new(transport.Message) | 		m := new(transport.Message) | ||||||
| 		if err := conn.Recv(m); err != nil { | 		if err := conn.Recv(m); err != nil { | ||||||
| @@ -162,19 +355,34 @@ func (n *network) handleConn(conn tunnel.Conn, msg chan *transport.Message) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (n *network) process(l tunnel.Listener) { | // acceptCtrlConn accepts connections from ControlChannel | ||||||
| 	// receive control message queue | func (n *network) acceptCtrlConn(l tunnel.Listener, recv chan *transport.Message) { | ||||||
| 	recv := make(chan *transport.Message, 128) | 	for { | ||||||
|  |  | ||||||
| 		// accept a connection | 		// accept a connection | ||||||
| 		conn, err := l.Accept() | 		conn, err := l.Accept() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			// TODO: handle this | 			// TODO: handle this | ||||||
| 		log.Debugf("Network tunnel accept error: %v", err) | 			log.Debugf("Network tunnel [%s] accept error: %v", ControlChannel, err) | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	go n.handleConn(conn, recv) | 		select { | ||||||
|  | 		case <-n.closed: | ||||||
|  | 			return | ||||||
|  | 		default: | ||||||
|  | 			// go handle ControlChannel connection | ||||||
|  | 			go n.handleCtrlConn(conn, recv) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // process processes network advertisements | ||||||
|  | func (n *network) processCtrlChan(l tunnel.Listener) { | ||||||
|  | 	// receive control message queue | ||||||
|  | 	recv := make(chan *transport.Message, 128) | ||||||
|  |  | ||||||
|  | 	// accept ControlChannel cconnections | ||||||
|  | 	go n.acceptCtrlConn(l, recv) | ||||||
|  |  | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| @@ -182,13 +390,13 @@ func (n *network) process(l tunnel.Listener) { | |||||||
| 			// switch on type of message and take action | 			// switch on type of message and take action | ||||||
| 			switch m.Header["Micro-Method"] { | 			switch m.Header["Micro-Method"] { | ||||||
| 			case "advert": | 			case "advert": | ||||||
| 				pbAdvert := &pb.Advert{} | 				pbRtrAdvert := &pbRtr.Advert{} | ||||||
| 				if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { | 				if err := proto.Unmarshal(m.Body, pbRtrAdvert); err != nil { | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| 				var events []*router.Event | 				var events []*router.Event | ||||||
| 				for _, event := range pbAdvert.Events { | 				for _, event := range pbRtrAdvert.Events { | ||||||
| 					route := router.Route{ | 					route := router.Route{ | ||||||
| 						Service: event.Route.Service, | 						Service: event.Route.Service, | ||||||
| 						Address: event.Route.Address, | 						Address: event.Route.Address, | ||||||
| @@ -200,16 +408,16 @@ func (n *network) process(l tunnel.Listener) { | |||||||
| 					} | 					} | ||||||
| 					e := &router.Event{ | 					e := &router.Event{ | ||||||
| 						Type:      router.EventType(event.Type), | 						Type:      router.EventType(event.Type), | ||||||
| 						Timestamp: time.Unix(0, pbAdvert.Timestamp), | 						Timestamp: time.Unix(0, pbRtrAdvert.Timestamp), | ||||||
| 						Route:     route, | 						Route:     route, | ||||||
| 					} | 					} | ||||||
| 					events = append(events, e) | 					events = append(events, e) | ||||||
| 				} | 				} | ||||||
| 				advert := &router.Advert{ | 				advert := &router.Advert{ | ||||||
| 					Id:        pbAdvert.Id, | 					Id:        pbRtrAdvert.Id, | ||||||
| 					Type:      router.AdvertType(pbAdvert.Type), | 					Type:      router.AdvertType(pbRtrAdvert.Type), | ||||||
| 					Timestamp: time.Unix(0, pbAdvert.Timestamp), | 					Timestamp: time.Unix(0, pbRtrAdvert.Timestamp), | ||||||
| 					TTL:       time.Duration(pbAdvert.Ttl), | 					TTL:       time.Duration(pbRtrAdvert.Ttl), | ||||||
| 					Events:    events, | 					Events:    events, | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
| @@ -231,10 +439,10 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A | |||||||
| 		// process local adverts and randomly fire them at other nodes | 		// process local adverts and randomly fire them at other nodes | ||||||
| 		case advert := <-advertChan: | 		case advert := <-advertChan: | ||||||
| 			// create a proto advert | 			// create a proto advert | ||||||
| 			var events []*pb.Event | 			var events []*pbRtr.Event | ||||||
| 			for _, event := range advert.Events { | 			for _, event := range advert.Events { | ||||||
| 				// NOTE: we override the Gateway and Link fields here | 				// NOTE: we override the Gateway and Link fields here | ||||||
| 				route := &pb.Route{ | 				route := &pbRtr.Route{ | ||||||
| 					Service: event.Route.Service, | 					Service: event.Route.Service, | ||||||
| 					Address: event.Route.Address, | 					Address: event.Route.Address, | ||||||
| 					Gateway: n.options.Address, | 					Gateway: n.options.Address, | ||||||
| @@ -243,23 +451,23 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A | |||||||
| 					Link:    DefaultLink, | 					Link:    DefaultLink, | ||||||
| 					Metric:  int64(event.Route.Metric), | 					Metric:  int64(event.Route.Metric), | ||||||
| 				} | 				} | ||||||
| 				e := &pb.Event{ | 				e := &pbRtr.Event{ | ||||||
| 					Type:      pb.EventType(event.Type), | 					Type:      pbRtr.EventType(event.Type), | ||||||
| 					Timestamp: event.Timestamp.UnixNano(), | 					Timestamp: event.Timestamp.UnixNano(), | ||||||
| 					Route:     route, | 					Route:     route, | ||||||
| 				} | 				} | ||||||
| 				events = append(events, e) | 				events = append(events, e) | ||||||
| 			} | 			} | ||||||
| 			pbAdvert := &pb.Advert{ | 			pbRtrAdvert := &pbRtr.Advert{ | ||||||
| 				Id:        advert.Id, | 				Id:        advert.Id, | ||||||
| 				Type:      pb.AdvertType(advert.Type), | 				Type:      pbRtr.AdvertType(advert.Type), | ||||||
| 				Timestamp: advert.Timestamp.UnixNano(), | 				Timestamp: advert.Timestamp.UnixNano(), | ||||||
| 				Events:    events, | 				Events:    events, | ||||||
| 			} | 			} | ||||||
| 			body, err := proto.Marshal(pbAdvert) | 			body, err := proto.Marshal(pbRtrAdvert) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				// TODO: should we bail here? | 				// TODO: should we bail here? | ||||||
| 				log.Debugf("Network failed to marshal message: %v", err) | 				log.Debugf("Network failed to marshal advert message: %v", err) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			// create transport message and chuck it down the pipe | 			// create transport message and chuck it down the pipe | ||||||
| @@ -271,7 +479,7 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A | |||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			if err := client.Send(&m); err != nil { | 			if err := client.Send(&m); err != nil { | ||||||
| 				log.Debugf("Network failed to send advert %s: %v", pbAdvert.Id, err) | 				log.Debugf("Network failed to send advert %s: %v", pbRtrAdvert.Id, err) | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 		case <-n.closed: | 		case <-n.closed: | ||||||
| @@ -307,13 +515,29 @@ func (n *network) Connect() error { | |||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	// dial into ControlChannel to send route adverts | 	// dial into ControlChannel to send route adverts | ||||||
| 	client, err := n.Tunnel.Dial(ControlChannel) | 	ctrlClient, err := n.Tunnel.Dial(ControlChannel) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	n.tunClient[ControlChannel] = ctrlClient | ||||||
|  |  | ||||||
| 	// listen on ControlChannel | 	// listen on ControlChannel | ||||||
| 	listener, err := n.Tunnel.Listen(ControlChannel) | 	ctrlListener, err := n.Tunnel.Listen(ControlChannel) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// dial into NetworkChannel to send network messages | ||||||
|  | 	netClient, err := n.Tunnel.Dial(NetworkChannel) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	n.tunClient[NetworkChannel] = netClient | ||||||
|  |  | ||||||
|  | 	// listen on NetworkChannel | ||||||
|  | 	netListener, err := n.Tunnel.Listen(NetworkChannel) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -321,9 +545,6 @@ func (n *network) Connect() error { | |||||||
| 	// create closed channel | 	// create closed channel | ||||||
| 	n.closed = make(chan bool) | 	n.closed = make(chan bool) | ||||||
|  |  | ||||||
| 	// keep resolving network nodes |  | ||||||
| 	go n.resolve() |  | ||||||
|  |  | ||||||
| 	// start the router | 	// start the router | ||||||
| 	if err := n.options.Router.Start(); err != nil { | 	if err := n.options.Router.Start(); err != nil { | ||||||
| 		return err | 		return err | ||||||
| @@ -335,19 +556,48 @@ func (n *network) Connect() error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// advertise routes |  | ||||||
| 	go n.advertise(client, advertChan) |  | ||||||
| 	// accept and process routes |  | ||||||
| 	go n.process(listener) |  | ||||||
|  |  | ||||||
| 	// start the server | 	// start the server | ||||||
| 	if err := n.server.Start(); err != nil { | 	if err := n.server.Start(); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// go resolving network nodes | ||||||
|  | 	go n.resolve() | ||||||
|  | 	// broadcast neighbourhood | ||||||
|  | 	go n.announce(netClient) | ||||||
|  | 	// listen to network messages | ||||||
|  | 	go n.processNetChan(netListener) | ||||||
|  | 	// advertise service routes | ||||||
|  | 	go n.advertise(ctrlClient, advertChan) | ||||||
|  | 	// accept and process routes | ||||||
|  | 	go n.processCtrlChan(ctrlListener) | ||||||
|  |  | ||||||
| 	// set connected to true | 	// set connected to true | ||||||
| 	n.connected = true | 	n.connected = true | ||||||
|  |  | ||||||
|  | 	// send connect message to NetworkChannel | ||||||
|  | 	node := &pbNet.Node{ | ||||||
|  | 		Id:      n.options.Id, | ||||||
|  | 		Address: n.options.Address, | ||||||
|  | 	} | ||||||
|  | 	pbNetConnect := &pbNet.Connect{ | ||||||
|  | 		Node: node, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// only proceed with sending to NetworkChannel if marshal succeeds | ||||||
|  | 	if body, err := proto.Marshal(pbNetConnect); err == nil { | ||||||
|  | 		m := transport.Message{ | ||||||
|  | 			Header: map[string]string{ | ||||||
|  | 				"Micro-Method": "connect", | ||||||
|  | 			}, | ||||||
|  | 			Body: body, | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		if err := netClient.Send(&m); err != nil { | ||||||
|  | 			log.Debugf("Network failed to send connect messsage: %v", err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -383,11 +633,39 @@ func (n *network) Close() error { | |||||||
| 	case <-n.closed: | 	case <-n.closed: | ||||||
| 		return nil | 		return nil | ||||||
| 	default: | 	default: | ||||||
|  | 		// TODO: send close message to the network channel | ||||||
| 		close(n.closed) | 		close(n.closed) | ||||||
| 		// set connected to false | 		// set connected to false | ||||||
| 		n.connected = false | 		n.connected = false | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// send close message only if we managed to connect to NetworkChannel | ||||||
|  | 	if netClient, ok := n.tunClient[NetworkChannel]; ok { | ||||||
|  | 		// send connect message to NetworkChannel | ||||||
|  | 		node := &pbNet.Node{ | ||||||
|  | 			Id:      n.options.Id, | ||||||
|  | 			Address: n.options.Address, | ||||||
|  | 		} | ||||||
|  | 		pbNetClose := &pbNet.Close{ | ||||||
|  | 			Node: node, | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// only proceed with sending to NetworkChannel if marshal succeeds | ||||||
|  | 		if body, err := proto.Marshal(pbNetClose); err == nil { | ||||||
|  | 			// create transport message and chuck it down the pipe | ||||||
|  | 			m := transport.Message{ | ||||||
|  | 				Header: map[string]string{ | ||||||
|  | 					"Micro-Method": "close", | ||||||
|  | 				}, | ||||||
|  | 				Body: body, | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if err := netClient.Send(&m); err != nil { | ||||||
|  | 				log.Debugf("Network failed to send close messsage: %v", err) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return n.close() | 	return n.close() | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -13,8 +13,10 @@ var ( | |||||||
| 	DefaultName = "go.micro" | 	DefaultName = "go.micro" | ||||||
| 	// DefaultAddress is default network address | 	// DefaultAddress is default network address | ||||||
| 	DefaultAddress = ":0" | 	DefaultAddress = ":0" | ||||||
| 	// ResolveTime ddefines the time to periodically resolve network nodes | 	// ResolveTime defines time interval to periodically resolve network nodes | ||||||
| 	ResolveTime = 1 * time.Minute | 	ResolveTime = 1 * time.Minute | ||||||
|  | 	// AnnounceTime defines time interval to periodically announce node neighbours | ||||||
|  | 	AnnounceTime = 30 * time.Second | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Network is micro network | // Network is micro network | ||||||
|   | |||||||
							
								
								
									
										21
									
								
								network/proto/network.micro.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										21
									
								
								network/proto/network.micro.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,21 @@ | |||||||
|  | // Code generated by protoc-gen-micro. DO NOT EDIT. | ||||||
|  | // source: network.proto | ||||||
|  |  | ||||||
|  | package go_micro_network | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	fmt "fmt" | ||||||
|  | 	proto "github.com/golang/protobuf/proto" | ||||||
|  | 	math "math" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Reference imports to suppress errors if they are not otherwise used. | ||||||
|  | var _ = proto.Marshal | ||||||
|  | var _ = fmt.Errorf | ||||||
|  | var _ = math.Inf | ||||||
|  |  | ||||||
|  | // This is a compile-time assertion to ensure that this generated file | ||||||
|  | // is compatible with the proto package it is being compiled against. | ||||||
|  | // A compilation error at this line likely means your copy of the | ||||||
|  | // proto package needs to be updated. | ||||||
|  | const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package | ||||||
							
								
								
									
										227
									
								
								network/proto/network.pb.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										227
									
								
								network/proto/network.pb.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,227 @@ | |||||||
|  | // Code generated by protoc-gen-go. DO NOT EDIT. | ||||||
|  | // source: network.proto | ||||||
|  |  | ||||||
|  | package go_micro_network | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	fmt "fmt" | ||||||
|  | 	proto "github.com/golang/protobuf/proto" | ||||||
|  | 	math "math" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Reference imports to suppress errors if they are not otherwise used. | ||||||
|  | var _ = proto.Marshal | ||||||
|  | var _ = fmt.Errorf | ||||||
|  | var _ = math.Inf | ||||||
|  |  | ||||||
|  | // This is a compile-time assertion to ensure that this generated file | ||||||
|  | // is compatible with the proto package it is being compiled against. | ||||||
|  | // A compilation error at this line likely means your copy of the | ||||||
|  | // proto package needs to be updated. | ||||||
|  | const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package | ||||||
|  |  | ||||||
|  | // Node is network node | ||||||
|  | type Node struct { | ||||||
|  | 	// node ide | ||||||
|  | 	Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` | ||||||
|  | 	// node address | ||||||
|  | 	Address              string   `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{} `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte   `json:"-"` | ||||||
|  | 	XXX_sizecache        int32    `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Node) Reset()         { *m = Node{} } | ||||||
|  | func (m *Node) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*Node) ProtoMessage()    {} | ||||||
|  | func (*Node) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_8571034d60397816, []int{0} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Node) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_Node.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *Node) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_Node.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *Node) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_Node.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *Node) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_Node.Size(m) | ||||||
|  | } | ||||||
|  | func (m *Node) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_Node.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_Node proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | func (m *Node) GetId() string { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Id | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Node) GetAddress() string { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Address | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Connect is sent when the node connects to the network | ||||||
|  | type Connect struct { | ||||||
|  | 	// network mode | ||||||
|  | 	Node                 *Node    `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{} `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte   `json:"-"` | ||||||
|  | 	XXX_sizecache        int32    `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Connect) Reset()         { *m = Connect{} } | ||||||
|  | func (m *Connect) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*Connect) ProtoMessage()    {} | ||||||
|  | func (*Connect) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_8571034d60397816, []int{1} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Connect) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_Connect.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *Connect) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_Connect.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *Connect) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_Connect.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *Connect) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_Connect.Size(m) | ||||||
|  | } | ||||||
|  | func (m *Connect) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_Connect.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_Connect proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | func (m *Connect) GetNode() *Node { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Node | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Close is sent when the node disconnects from the network | ||||||
|  | type Close struct { | ||||||
|  | 	// network mode | ||||||
|  | 	Node                 *Node    `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{} `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte   `json:"-"` | ||||||
|  | 	XXX_sizecache        int32    `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Close) Reset()         { *m = Close{} } | ||||||
|  | func (m *Close) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*Close) ProtoMessage()    {} | ||||||
|  | func (*Close) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_8571034d60397816, []int{2} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Close) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_Close.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *Close) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_Close.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *Close) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_Close.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *Close) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_Close.Size(m) | ||||||
|  | } | ||||||
|  | func (m *Close) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_Close.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_Close proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | func (m *Close) GetNode() *Node { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Node | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Neighbour is used to nnounce node neighbourhood | ||||||
|  | type Neighbour struct { | ||||||
|  | 	// network mode | ||||||
|  | 	Node *Node `protobuf:"bytes,1,opt,name=node,proto3" json:"node,omitempty"` | ||||||
|  | 	// neighbours | ||||||
|  | 	Neighbours           []*Node  `protobuf:"bytes,3,rep,name=neighbours,proto3" json:"neighbours,omitempty"` | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{} `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte   `json:"-"` | ||||||
|  | 	XXX_sizecache        int32    `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Neighbour) Reset()         { *m = Neighbour{} } | ||||||
|  | func (m *Neighbour) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*Neighbour) ProtoMessage()    {} | ||||||
|  | func (*Neighbour) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_8571034d60397816, []int{3} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Neighbour) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_Neighbour.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *Neighbour) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_Neighbour.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *Neighbour) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_Neighbour.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *Neighbour) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_Neighbour.Size(m) | ||||||
|  | } | ||||||
|  | func (m *Neighbour) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_Neighbour.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_Neighbour proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | func (m *Neighbour) GetNode() *Node { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Node | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Neighbour) GetNeighbours() []*Node { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Neighbours | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	proto.RegisterType((*Node)(nil), "go.micro.network.Node") | ||||||
|  | 	proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") | ||||||
|  | 	proto.RegisterType((*Close)(nil), "go.micro.network.Close") | ||||||
|  | 	proto.RegisterType((*Neighbour)(nil), "go.micro.network.Neighbour") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } | ||||||
|  |  | ||||||
|  | var fileDescriptor_8571034d60397816 = []byte{ | ||||||
|  | 	// 173 bytes of a gzipped FileDescriptorProto | ||||||
|  | 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x4b, 0x2d, 0x29, | ||||||
|  | 	0xcf, 0x2f, 0xca, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0x48, 0xcf, 0xd7, 0xcb, 0xcd, | ||||||
|  | 	0x4c, 0x2e, 0xca, 0xd7, 0x83, 0x8a, 0x2b, 0x19, 0x70, 0xb1, 0xf8, 0xe5, 0xa7, 0xa4, 0x0a, 0xf1, | ||||||
|  | 	0x71, 0x31, 0x65, 0xa6, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x31, 0x65, 0xa6, 0x08, 0x49, | ||||||
|  | 	0x70, 0xb1, 0x27, 0xa6, 0xa4, 0x14, 0xa5, 0x16, 0x17, 0x4b, 0x30, 0x81, 0x05, 0x61, 0x5c, 0x25, | ||||||
|  | 	0x53, 0x2e, 0x76, 0xe7, 0xfc, 0xbc, 0xbc, 0xd4, 0xe4, 0x12, 0x21, 0x2d, 0x2e, 0x96, 0xbc, 0xfc, | ||||||
|  | 	0x94, 0x54, 0xb0, 0x36, 0x6e, 0x23, 0x31, 0x3d, 0x74, 0xd3, 0xf5, 0x40, 0x46, 0x07, 0x81, 0xd5, | ||||||
|  | 	0x28, 0x19, 0x73, 0xb1, 0x3a, 0xe7, 0xe4, 0x17, 0xa7, 0x92, 0xa4, 0x29, 0x9f, 0x8b, 0xd3, 0x2f, | ||||||
|  | 	0x35, 0x33, 0x3d, 0x23, 0x29, 0xbf, 0xb4, 0x88, 0x14, 0x8d, 0x42, 0x66, 0x5c, 0x5c, 0x79, 0x30, | ||||||
|  | 	0x8d, 0xc5, 0x12, 0xcc, 0x0a, 0xcc, 0x78, 0x74, 0x20, 0xa9, 0x4c, 0x62, 0x03, 0x87, 0x93, 0x31, | ||||||
|  | 	0x20, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x74, 0x00, 0x71, 0x38, 0x01, 0x00, 0x00, | ||||||
|  | } | ||||||
							
								
								
									
										31
									
								
								network/proto/network.proto
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										31
									
								
								network/proto/network.proto
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,31 @@ | |||||||
|  | syntax = "proto3"; | ||||||
|  |  | ||||||
|  | package go.micro.network; | ||||||
|  |  | ||||||
|  | // Node is network node | ||||||
|  | message Node { | ||||||
|  |         // node ide | ||||||
|  |         string id = 1; | ||||||
|  |         // node address | ||||||
|  |         string address = 2; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Connect is sent when the node connects to the network | ||||||
|  | message Connect { | ||||||
|  |         // network mode | ||||||
|  |         Node node = 1; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Close is sent when the node disconnects from the network | ||||||
|  | message Close { | ||||||
|  |         // network mode | ||||||
|  |         Node node = 1; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Neighbour is used to nnounce node neighbourhood | ||||||
|  | message Neighbour { | ||||||
|  |         // network mode | ||||||
|  |         Node node = 1; | ||||||
|  |         // neighbours | ||||||
|  |         repeated Node neighbours = 3; | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user