Merge pull request #849 from micro/connect-init

Connect init
This commit is contained in:
Asim Aslam 2019-10-13 18:40:11 +01:00 committed by GitHub
commit b701da6d69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 331 additions and 87 deletions

View File

@ -144,6 +144,18 @@ func newNetwork(opts ...Option) Network {
return network return network
} }
func (n *network) Init(opts ...Option) error {
n.Lock()
defer n.Unlock()
// TODO: maybe only allow reinit of certain opts
for _, o := range opts {
o(&n.options)
}
return nil
}
// Options returns network options // Options returns network options
func (n *network) Options() Options { func (n *network) Options() Options {
n.RLock() n.RLock()
@ -163,9 +175,6 @@ func (n *network) Name() string {
func (n *network) resolveNodes() ([]string, error) { func (n *network) resolveNodes() ([]string, error) {
// resolve the network address to network nodes // resolve the network address to network nodes
records, err := n.options.Resolver.Resolve(n.options.Name) records, err := n.options.Resolver.Resolve(n.options.Name)
if err != nil {
return nil, err
}
nodeMap := make(map[string]bool) nodeMap := make(map[string]bool)
@ -197,6 +206,7 @@ func (n *network) resolveNodes() ([]string, error) {
// resolve anything that looks like a host name // resolve anything that looks like a host name
records, err := dns.Resolve(node) records, err := dns.Resolve(node)
if err != nil { if err != nil {
log.Debugf("Failed to resolve %v %v", node, err)
continue continue
} }
@ -208,7 +218,7 @@ func (n *network) resolveNodes() ([]string, error) {
} }
} }
return nodes, nil return nodes, err
} }
// resolve continuously resolves network nodes and initializes network tunnel with resolved addresses // resolve continuously resolves network nodes and initializes network tunnel with resolved addresses
@ -335,7 +345,7 @@ func (n *network) processNetChan(client transport.Client, listener tunnel.Listen
if pbNetPeer.Node.Id == n.options.Id { if pbNetPeer.Node.Id == n.options.Id {
continue continue
} }
log.Debugf("Network received peer message from: %s", pbNetPeer.Node.Id) log.Debugf("Network received peer message from: %s %s", pbNetPeer.Node.Id, pbNetPeer.Node.Address)
peer := &node{ peer := &node{
id: pbNetPeer.Node.Id, id: pbNetPeer.Node.Id,
address: pbNetPeer.Node.Address, address: pbNetPeer.Node.Address,
@ -759,14 +769,25 @@ func (n *network) advertise(client transport.Client, advertChan <-chan *router.A
} }
} }
func (n *network) sendConnect() {
// send connect message to NetworkChannel
// NOTE: in theory we could do this as soon as
// Dial to NetworkChannel succeeds, but instead
// we initialize all other node resources first
msg := &pbNet.Connect{
Node: &pbNet.Node{
Id: n.node.id,
Address: n.node.address,
},
}
if err := n.sendMsg("connect", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to send connect message: %s", err)
}
}
// Connect connects the network // Connect connects the network
func (n *network) Connect() error { func (n *network) Connect() error {
n.Lock() n.Lock()
// return if already connected
if n.connected {
n.Unlock()
return nil
}
// try to resolve network nodes // try to resolve network nodes
nodes, err := n.resolveNodes() nodes, err := n.resolveNodes()
@ -785,6 +806,15 @@ func (n *network) Connect() error {
return err return err
} }
// return if already connected
if n.connected {
// unlock first
n.Unlock()
// send the connect message
n.sendConnect()
return nil
}
// set our internal node address // set our internal node address
// if advertise address is not set // if advertise address is not set
if len(n.options.Advertise) == 0 { if len(n.options.Advertise) == 0 {
@ -846,19 +876,8 @@ func (n *network) Connect() error {
} }
n.Unlock() n.Unlock()
// send connect message to NetworkChannel // send the connect message
// NOTE: in theory we could do this as soon as n.sendConnect()
// Dial to NetworkChannel succeeds, but instead
// we initialize all other node resources first
msg := &pbNet.Connect{
Node: &pbNet.Node{
Id: n.node.id,
Address: n.node.address,
},
}
if err := n.sendMsg("connect", msg, NetworkChannel); err != nil {
log.Debugf("Network failed to send connect message: %s", err)
}
// go resolving network nodes // go resolving network nodes
go n.resolve() go n.resolve()

View File

@ -38,6 +38,8 @@ type Node interface {
type Network interface { type Network interface {
// Node is network node // Node is network node
Node Node
// Initialise options
Init(...Option) error
// Options returns the network options // Options returns the network options
Options() Options Options() Options
// Name of the network // Name of the network

View File

@ -35,6 +35,8 @@ var _ server.Option
// Client API for Network service // Client API for Network service
type NetworkService interface { type NetworkService interface {
// Connect to the network
Connect(ctx context.Context, in *ConnectRequest, opts ...client.CallOption) (*ConnectResponse, error)
// Returns the entire network graph // Returns the entire network graph
Graph(ctx context.Context, in *GraphRequest, opts ...client.CallOption) (*GraphResponse, error) Graph(ctx context.Context, in *GraphRequest, opts ...client.CallOption) (*GraphResponse, error)
// Returns a list of known nodes in the network // Returns a list of known nodes in the network
@ -63,6 +65,16 @@ func NewNetworkService(name string, c client.Client) NetworkService {
} }
} }
func (c *networkService) Connect(ctx context.Context, in *ConnectRequest, opts ...client.CallOption) (*ConnectResponse, error) {
req := c.c.NewRequest(c.name, "Network.Connect", in)
out := new(ConnectResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *networkService) Graph(ctx context.Context, in *GraphRequest, opts ...client.CallOption) (*GraphResponse, error) { func (c *networkService) Graph(ctx context.Context, in *GraphRequest, opts ...client.CallOption) (*GraphResponse, error) {
req := c.c.NewRequest(c.name, "Network.Graph", in) req := c.c.NewRequest(c.name, "Network.Graph", in)
out := new(GraphResponse) out := new(GraphResponse)
@ -106,6 +118,8 @@ func (c *networkService) Services(ctx context.Context, in *ServicesRequest, opts
// Server API for Network service // Server API for Network service
type NetworkHandler interface { type NetworkHandler interface {
// Connect to the network
Connect(context.Context, *ConnectRequest, *ConnectResponse) error
// Returns the entire network graph // Returns the entire network graph
Graph(context.Context, *GraphRequest, *GraphResponse) error Graph(context.Context, *GraphRequest, *GraphResponse) error
// Returns a list of known nodes in the network // Returns a list of known nodes in the network
@ -118,6 +132,7 @@ type NetworkHandler interface {
func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error { func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error {
type network interface { type network interface {
Connect(ctx context.Context, in *ConnectRequest, out *ConnectResponse) error
Graph(ctx context.Context, in *GraphRequest, out *GraphResponse) error Graph(ctx context.Context, in *GraphRequest, out *GraphResponse) error
Nodes(ctx context.Context, in *NodesRequest, out *NodesResponse) error Nodes(ctx context.Context, in *NodesRequest, out *NodesResponse) error
Routes(ctx context.Context, in *RoutesRequest, out *RoutesResponse) error Routes(ctx context.Context, in *RoutesRequest, out *RoutesResponse) error
@ -134,6 +149,10 @@ type networkHandler struct {
NetworkHandler NetworkHandler
} }
func (h *networkHandler) Connect(ctx context.Context, in *ConnectRequest, out *ConnectResponse) error {
return h.NetworkHandler.Connect(ctx, in, out)
}
func (h *networkHandler) Graph(ctx context.Context, in *GraphRequest, out *GraphResponse) error { func (h *networkHandler) Graph(ctx context.Context, in *GraphRequest, out *GraphResponse) error {
return h.NetworkHandler.Graph(ctx, in, out) return h.NetworkHandler.Graph(ctx, in, out)
} }

View File

@ -95,6 +95,76 @@ func (m *Query) GetNetwork() string {
return "" return ""
} }
type ConnectRequest struct {
Nodes []*Node `protobuf:"bytes,1,rep,name=nodes,proto3" json:"nodes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ConnectRequest) Reset() { *m = ConnectRequest{} }
func (m *ConnectRequest) String() string { return proto.CompactTextString(m) }
func (*ConnectRequest) ProtoMessage() {}
func (*ConnectRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{1}
}
func (m *ConnectRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ConnectRequest.Unmarshal(m, b)
}
func (m *ConnectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ConnectRequest.Marshal(b, m, deterministic)
}
func (m *ConnectRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ConnectRequest.Merge(m, src)
}
func (m *ConnectRequest) XXX_Size() int {
return xxx_messageInfo_ConnectRequest.Size(m)
}
func (m *ConnectRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ConnectRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ConnectRequest proto.InternalMessageInfo
func (m *ConnectRequest) GetNodes() []*Node {
if m != nil {
return m.Nodes
}
return nil
}
type ConnectResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ConnectResponse) Reset() { *m = ConnectResponse{} }
func (m *ConnectResponse) String() string { return proto.CompactTextString(m) }
func (*ConnectResponse) ProtoMessage() {}
func (*ConnectResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{2}
}
func (m *ConnectResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ConnectResponse.Unmarshal(m, b)
}
func (m *ConnectResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ConnectResponse.Marshal(b, m, deterministic)
}
func (m *ConnectResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ConnectResponse.Merge(m, src)
}
func (m *ConnectResponse) XXX_Size() int {
return xxx_messageInfo_ConnectResponse.Size(m)
}
func (m *ConnectResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ConnectResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ConnectResponse proto.InternalMessageInfo
// PeerRequest requests list of peers // PeerRequest requests list of peers
type NodesRequest struct { type NodesRequest struct {
// node topology depth // node topology depth
@ -108,7 +178,7 @@ func (m *NodesRequest) Reset() { *m = NodesRequest{} }
func (m *NodesRequest) String() string { return proto.CompactTextString(m) } func (m *NodesRequest) String() string { return proto.CompactTextString(m) }
func (*NodesRequest) ProtoMessage() {} func (*NodesRequest) ProtoMessage() {}
func (*NodesRequest) Descriptor() ([]byte, []int) { func (*NodesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{1} return fileDescriptor_0b7953b26a7c4730, []int{3}
} }
func (m *NodesRequest) XXX_Unmarshal(b []byte) error { func (m *NodesRequest) XXX_Unmarshal(b []byte) error {
@ -149,7 +219,7 @@ func (m *NodesResponse) Reset() { *m = NodesResponse{} }
func (m *NodesResponse) String() string { return proto.CompactTextString(m) } func (m *NodesResponse) String() string { return proto.CompactTextString(m) }
func (*NodesResponse) ProtoMessage() {} func (*NodesResponse) ProtoMessage() {}
func (*NodesResponse) Descriptor() ([]byte, []int) { func (*NodesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{2} return fileDescriptor_0b7953b26a7c4730, []int{4}
} }
func (m *NodesResponse) XXX_Unmarshal(b []byte) error { func (m *NodesResponse) XXX_Unmarshal(b []byte) error {
@ -189,7 +259,7 @@ func (m *GraphRequest) Reset() { *m = GraphRequest{} }
func (m *GraphRequest) String() string { return proto.CompactTextString(m) } func (m *GraphRequest) String() string { return proto.CompactTextString(m) }
func (*GraphRequest) ProtoMessage() {} func (*GraphRequest) ProtoMessage() {}
func (*GraphRequest) Descriptor() ([]byte, []int) { func (*GraphRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{3} return fileDescriptor_0b7953b26a7c4730, []int{5}
} }
func (m *GraphRequest) XXX_Unmarshal(b []byte) error { func (m *GraphRequest) XXX_Unmarshal(b []byte) error {
@ -228,7 +298,7 @@ func (m *GraphResponse) Reset() { *m = GraphResponse{} }
func (m *GraphResponse) String() string { return proto.CompactTextString(m) } func (m *GraphResponse) String() string { return proto.CompactTextString(m) }
func (*GraphResponse) ProtoMessage() {} func (*GraphResponse) ProtoMessage() {}
func (*GraphResponse) Descriptor() ([]byte, []int) { func (*GraphResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{4} return fileDescriptor_0b7953b26a7c4730, []int{6}
} }
func (m *GraphResponse) XXX_Unmarshal(b []byte) error { func (m *GraphResponse) XXX_Unmarshal(b []byte) error {
@ -268,7 +338,7 @@ func (m *RoutesRequest) Reset() { *m = RoutesRequest{} }
func (m *RoutesRequest) String() string { return proto.CompactTextString(m) } func (m *RoutesRequest) String() string { return proto.CompactTextString(m) }
func (*RoutesRequest) ProtoMessage() {} func (*RoutesRequest) ProtoMessage() {}
func (*RoutesRequest) Descriptor() ([]byte, []int) { func (*RoutesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{5} return fileDescriptor_0b7953b26a7c4730, []int{7}
} }
func (m *RoutesRequest) XXX_Unmarshal(b []byte) error { func (m *RoutesRequest) XXX_Unmarshal(b []byte) error {
@ -307,7 +377,7 @@ func (m *RoutesResponse) Reset() { *m = RoutesResponse{} }
func (m *RoutesResponse) String() string { return proto.CompactTextString(m) } func (m *RoutesResponse) String() string { return proto.CompactTextString(m) }
func (*RoutesResponse) ProtoMessage() {} func (*RoutesResponse) ProtoMessage() {}
func (*RoutesResponse) Descriptor() ([]byte, []int) { func (*RoutesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{6} return fileDescriptor_0b7953b26a7c4730, []int{8}
} }
func (m *RoutesResponse) XXX_Unmarshal(b []byte) error { func (m *RoutesResponse) XXX_Unmarshal(b []byte) error {
@ -345,7 +415,7 @@ func (m *ServicesRequest) Reset() { *m = ServicesRequest{} }
func (m *ServicesRequest) String() string { return proto.CompactTextString(m) } func (m *ServicesRequest) String() string { return proto.CompactTextString(m) }
func (*ServicesRequest) ProtoMessage() {} func (*ServicesRequest) ProtoMessage() {}
func (*ServicesRequest) Descriptor() ([]byte, []int) { func (*ServicesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{7} return fileDescriptor_0b7953b26a7c4730, []int{9}
} }
func (m *ServicesRequest) XXX_Unmarshal(b []byte) error { func (m *ServicesRequest) XXX_Unmarshal(b []byte) error {
@ -377,7 +447,7 @@ func (m *ServicesResponse) Reset() { *m = ServicesResponse{} }
func (m *ServicesResponse) String() string { return proto.CompactTextString(m) } func (m *ServicesResponse) String() string { return proto.CompactTextString(m) }
func (*ServicesResponse) ProtoMessage() {} func (*ServicesResponse) ProtoMessage() {}
func (*ServicesResponse) Descriptor() ([]byte, []int) { func (*ServicesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{8} return fileDescriptor_0b7953b26a7c4730, []int{10}
} }
func (m *ServicesResponse) XXX_Unmarshal(b []byte) error { func (m *ServicesResponse) XXX_Unmarshal(b []byte) error {
@ -410,17 +480,21 @@ type Node struct {
// node id // node id
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// node address // node address
Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` // the network
XXX_unrecognized []byte `json:"-"` Network string `protobuf:"bytes,3,opt,name=network,proto3" json:"network,omitempty"`
XXX_sizecache int32 `json:"-"` // associated metadata
Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
} }
func (m *Node) Reset() { *m = Node{} } func (m *Node) Reset() { *m = Node{} }
func (m *Node) String() string { return proto.CompactTextString(m) } func (m *Node) String() string { return proto.CompactTextString(m) }
func (*Node) ProtoMessage() {} func (*Node) ProtoMessage() {}
func (*Node) Descriptor() ([]byte, []int) { func (*Node) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{9} return fileDescriptor_0b7953b26a7c4730, []int{11}
} }
func (m *Node) XXX_Unmarshal(b []byte) error { func (m *Node) XXX_Unmarshal(b []byte) error {
@ -455,6 +529,20 @@ func (m *Node) GetAddress() string {
return "" return ""
} }
func (m *Node) GetNetwork() string {
if m != nil {
return m.Network
}
return ""
}
func (m *Node) GetMetadata() map[string]string {
if m != nil {
return m.Metadata
}
return nil
}
// Connect is sent when the node connects to the network // Connect is sent when the node connects to the network
type Connect struct { type Connect struct {
// network mode // network mode
@ -468,7 +556,7 @@ func (m *Connect) Reset() { *m = Connect{} }
func (m *Connect) String() string { return proto.CompactTextString(m) } func (m *Connect) String() string { return proto.CompactTextString(m) }
func (*Connect) ProtoMessage() {} func (*Connect) ProtoMessage() {}
func (*Connect) Descriptor() ([]byte, []int) { func (*Connect) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{10} return fileDescriptor_0b7953b26a7c4730, []int{12}
} }
func (m *Connect) XXX_Unmarshal(b []byte) error { func (m *Connect) XXX_Unmarshal(b []byte) error {
@ -509,7 +597,7 @@ func (m *Close) Reset() { *m = Close{} }
func (m *Close) String() string { return proto.CompactTextString(m) } func (m *Close) String() string { return proto.CompactTextString(m) }
func (*Close) ProtoMessage() {} func (*Close) ProtoMessage() {}
func (*Close) Descriptor() ([]byte, []int) { func (*Close) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{11} return fileDescriptor_0b7953b26a7c4730, []int{13}
} }
func (m *Close) XXX_Unmarshal(b []byte) error { func (m *Close) XXX_Unmarshal(b []byte) error {
@ -552,7 +640,7 @@ func (m *Peer) Reset() { *m = Peer{} }
func (m *Peer) String() string { return proto.CompactTextString(m) } func (m *Peer) String() string { return proto.CompactTextString(m) }
func (*Peer) ProtoMessage() {} func (*Peer) ProtoMessage() {}
func (*Peer) Descriptor() ([]byte, []int) { func (*Peer) Descriptor() ([]byte, []int) {
return fileDescriptor_0b7953b26a7c4730, []int{12} return fileDescriptor_0b7953b26a7c4730, []int{14}
} }
func (m *Peer) XXX_Unmarshal(b []byte) error { func (m *Peer) XXX_Unmarshal(b []byte) error {
@ -589,6 +677,8 @@ func (m *Peer) GetPeers() []*Peer {
func init() { func init() {
proto.RegisterType((*Query)(nil), "go.micro.network.Query") proto.RegisterType((*Query)(nil), "go.micro.network.Query")
proto.RegisterType((*ConnectRequest)(nil), "go.micro.network.ConnectRequest")
proto.RegisterType((*ConnectResponse)(nil), "go.micro.network.ConnectResponse")
proto.RegisterType((*NodesRequest)(nil), "go.micro.network.NodesRequest") proto.RegisterType((*NodesRequest)(nil), "go.micro.network.NodesRequest")
proto.RegisterType((*NodesResponse)(nil), "go.micro.network.NodesResponse") proto.RegisterType((*NodesResponse)(nil), "go.micro.network.NodesResponse")
proto.RegisterType((*GraphRequest)(nil), "go.micro.network.GraphRequest") proto.RegisterType((*GraphRequest)(nil), "go.micro.network.GraphRequest")
@ -598,6 +688,7 @@ func init() {
proto.RegisterType((*ServicesRequest)(nil), "go.micro.network.ServicesRequest") proto.RegisterType((*ServicesRequest)(nil), "go.micro.network.ServicesRequest")
proto.RegisterType((*ServicesResponse)(nil), "go.micro.network.ServicesResponse") proto.RegisterType((*ServicesResponse)(nil), "go.micro.network.ServicesResponse")
proto.RegisterType((*Node)(nil), "go.micro.network.Node") proto.RegisterType((*Node)(nil), "go.micro.network.Node")
proto.RegisterMapType((map[string]string)(nil), "go.micro.network.Node.MetadataEntry")
proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") proto.RegisterType((*Connect)(nil), "go.micro.network.Connect")
proto.RegisterType((*Close)(nil), "go.micro.network.Close") proto.RegisterType((*Close)(nil), "go.micro.network.Close")
proto.RegisterType((*Peer)(nil), "go.micro.network.Peer") proto.RegisterType((*Peer)(nil), "go.micro.network.Peer")
@ -608,38 +699,43 @@ func init() {
} }
var fileDescriptor_0b7953b26a7c4730 = []byte{ var fileDescriptor_0b7953b26a7c4730 = []byte{
// 482 bytes of a gzipped FileDescriptorProto // 573 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x4d, 0x6f, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x61, 0x6a, 0xdb, 0x4c,
0x10, 0x6d, 0x3e, 0x9c, 0xb4, 0x03, 0x29, 0x65, 0x85, 0x8a, 0xe5, 0x03, 0x84, 0x15, 0x87, 0x0a, 0x10, 0x8d, 0x2c, 0xcb, 0x76, 0xe6, 0x8b, 0xfd, 0xb9, 0x4b, 0x49, 0x85, 0x7e, 0xb4, 0xee, 0xe2,
0x51, 0x07, 0x35, 0xea, 0x09, 0x81, 0x90, 0x7a, 0x40, 0x42, 0xa2, 0x82, 0xed, 0x1f, 0x20, 0x8d, 0x1f, 0xa1, 0x34, 0x32, 0xc4, 0x04, 0x4a, 0x43, 0x43, 0x20, 0x94, 0x42, 0x21, 0x21, 0x55, 0x2e,
0x47, 0x89, 0x05, 0xf5, 0xba, 0xbb, 0x1b, 0xaa, 0xfe, 0x02, 0x7e, 0x19, 0xff, 0x0b, 0xed, 0xce, 0x50, 0xc5, 0x1a, 0x6c, 0x93, 0x58, 0xeb, 0xac, 0xd6, 0x09, 0x3e, 0x41, 0x8f, 0xd0, 0x33, 0xf5,
0xd8, 0xb4, 0x89, 0x1d, 0xc1, 0xcd, 0x6f, 0xe7, 0xcd, 0x9b, 0xdd, 0x37, 0x4f, 0x86, 0xd3, 0x45, 0x56, 0x65, 0x77, 0x47, 0x8a, 0x1d, 0xcb, 0xa2, 0xf9, 0xe7, 0xd1, 0xbc, 0xf7, 0x66, 0x67, 0xe6,
0xee, 0x96, 0xab, 0xcb, 0x74, 0xae, 0xaf, 0x26, 0x57, 0xf9, 0xdc, 0xe8, 0xc9, 0x42, 0x1f, 0xd3, 0x8d, 0xe1, 0x78, 0x3c, 0x55, 0x93, 0xc5, 0x4d, 0x38, 0x12, 0xb3, 0xc1, 0x6c, 0x3a, 0x92, 0x62,
0x47, 0x81, 0xee, 0x46, 0x9b, 0xef, 0x93, 0xd2, 0x68, 0x57, 0xa3, 0x34, 0x20, 0x71, 0xb0, 0xd0, 0x30, 0x16, 0x87, 0xf6, 0x47, 0x8a, 0xea, 0x51, 0xc8, 0xdb, 0xc1, 0x5c, 0x0a, 0x55, 0x44, 0xa1,
0x69, 0x60, 0xa5, 0x7c, 0x9e, 0x4c, 0xdb, 0x85, 0x8c, 0x5e, 0x39, 0x34, 0xac, 0x43, 0x80, 0x64, 0x89, 0x58, 0x77, 0x2c, 0x42, 0x83, 0x0a, 0xe9, 0x7b, 0x30, 0xdc, 0x2e, 0x24, 0xc5, 0x42, 0xa1,
0xe4, 0xaf, 0x0e, 0x44, 0x5f, 0x57, 0x68, 0x6e, 0x45, 0x0c, 0x43, 0x8b, 0xe6, 0x67, 0x3e, 0xc7, 0x24, 0x1d, 0x1b, 0x58, 0x19, 0xfe, 0xcb, 0x01, 0xef, 0xc7, 0x02, 0xe5, 0x92, 0xf9, 0xd0, 0xcc,
0xb8, 0x33, 0xee, 0x1c, 0xed, 0xa9, 0x0a, 0xfa, 0xca, 0x2c, 0xcb, 0x0c, 0x5a, 0x1b, 0x77, 0xa9, 0x50, 0x3e, 0x4c, 0x47, 0xe8, 0x3b, 0x3d, 0xe7, 0x60, 0x37, 0xca, 0x43, 0x9d, 0x89, 0x93, 0x44,
0xc2, 0xd0, 0x57, 0x16, 0x33, 0x87, 0x37, 0xb3, 0xdb, 0xb8, 0x47, 0x15, 0x86, 0xe2, 0x10, 0x06, 0x62, 0x96, 0xf9, 0x35, 0x9b, 0xa1, 0x50, 0x67, 0xc6, 0xb1, 0xc2, 0xc7, 0x78, 0xe9, 0xbb, 0x36,
0x34, 0x27, 0xee, 0x87, 0x02, 0x23, 0xdf, 0xc1, 0xf7, 0x8d, 0x23, 0xea, 0x60, 0x28, 0x5f, 0xc2, 0x43, 0x21, 0xdb, 0x87, 0x86, 0xad, 0xe3, 0xd7, 0x4d, 0x82, 0x22, 0xcd, 0xa0, 0xf7, 0xfa, 0x9e,
0xc3, 0x73, 0x9d, 0xa1, 0x55, 0x78, 0xbd, 0x42, 0xeb, 0xc4, 0x13, 0x88, 0x32, 0x2c, 0xdd, 0x32, 0x65, 0x50, 0xc8, 0x4f, 0xa1, 0x73, 0x2e, 0xd2, 0x14, 0x47, 0x2a, 0xc2, 0xfb, 0x05, 0x66, 0x8a,
0xdc, 0x66, 0xa4, 0x08, 0xc8, 0x77, 0x30, 0x62, 0x96, 0x2d, 0x75, 0x61, 0x51, 0xbc, 0x86, 0xa8, 0x7d, 0x04, 0x2f, 0x15, 0x09, 0x66, 0xbe, 0xd3, 0x73, 0x0f, 0xfe, 0x3b, 0xda, 0x0f, 0x9f, 0xb7,
0xf0, 0x07, 0x71, 0x67, 0xdc, 0x3b, 0x7a, 0x70, 0x72, 0x98, 0xae, 0xfb, 0x92, 0x7a, 0xbe, 0x22, 0x1c, 0x5e, 0x8a, 0x04, 0x23, 0x0b, 0xe2, 0xaf, 0xe0, 0xff, 0x82, 0x9f, 0xcd, 0x45, 0x9a, 0x21,
0x92, 0x1f, 0xf2, 0xd1, 0xcc, 0xca, 0xe5, 0xf6, 0x21, 0x6f, 0x61, 0xc4, 0x2c, 0x1e, 0xf2, 0x0a, 0xef, 0xc3, 0x9e, 0x46, 0x64, 0xb9, 0xe0, 0x6b, 0xf0, 0x12, 0x9c, 0xab, 0x89, 0x69, 0xb0, 0x1d,
0xfa, 0x46, 0x6b, 0x17, 0x58, 0x8d, 0x33, 0xbe, 0x20, 0x1a, 0x15, 0x38, 0xf2, 0x3d, 0x8c, 0x94, 0xd9, 0x80, 0x7f, 0x81, 0x36, 0xa1, 0x2c, 0xed, 0x85, 0x75, 0xfb, 0xb0, 0xf7, 0x4d, 0xc6, 0xf3,
0x7f, 0x6b, 0xfd, 0x90, 0x63, 0x88, 0xae, 0xbd, 0xc3, 0xdc, 0xfd, 0x74, 0xb3, 0x3b, 0x2c, 0x40, 0x49, 0x75, 0x91, 0x13, 0x68, 0x13, 0x8a, 0x8a, 0x7c, 0x80, 0xba, 0x14, 0x42, 0x19, 0x54, 0x69,
0x11, 0x4b, 0x7e, 0x80, 0xfd, 0xaa, 0x9f, 0xa7, 0xa7, 0xec, 0x65, 0xc3, 0x1b, 0x79, 0x97, 0xa1, 0x8d, 0x2b, 0x44, 0x19, 0x19, 0x0c, 0x3f, 0x85, 0x76, 0xa4, 0xc7, 0x57, 0x34, 0x72, 0x08, 0xde,
0x81, 0x3d, 0xb6, 0xf2, 0x31, 0x3c, 0xba, 0xa0, 0xd5, 0x55, 0x77, 0x90, 0x29, 0x1c, 0xfc, 0x3d, 0xbd, 0x5e, 0x1a, 0xb1, 0xdf, 0x6c, 0xb2, 0xcd, 0x4e, 0x23, 0x8b, 0xe2, 0x67, 0xd0, 0xc9, 0xf9,
0x62, 0xd9, 0x04, 0x76, 0x79, 0xc3, 0x24, 0xbc, 0xa7, 0x6a, 0x2c, 0xdf, 0x40, 0xdf, 0xdb, 0x26, 0x54, 0x3d, 0xa4, 0xf5, 0x94, 0xf4, 0x48, 0xf6, 0x30, 0x04, 0x5a, 0x9b, 0x19, 0xee, 0xb5, 0x75,
0xf6, 0xa1, 0x9b, 0x67, 0x9c, 0x87, 0x6e, 0x9e, 0xb5, 0x47, 0x41, 0x9e, 0xc2, 0xf0, 0x4c, 0x17, 0x43, 0xfe, 0x06, 0x1e, 0x42, 0xf7, 0xe9, 0x13, 0xc9, 0x06, 0xd0, 0x22, 0xd3, 0x58, 0xe1, 0xdd,
0x05, 0xce, 0x9d, 0x77, 0xcb, 0xbb, 0xdd, 0xee, 0x56, 0xd8, 0x48, 0xe0, 0xc8, 0x29, 0x44, 0x67, 0xa8, 0x88, 0xf9, 0x1f, 0x07, 0xea, 0x7a, 0x6e, 0xac, 0x03, 0xb5, 0x69, 0x42, 0x1e, 0xab, 0x4d,
0x3f, 0x34, 0x59, 0xfc, 0xcf, 0x4d, 0xdf, 0xa0, 0xef, 0x0d, 0xff, 0x9f, 0x1e, 0x9f, 0x93, 0x12, 0x93, 0x6a, 0x7b, 0xe5, 0x66, 0x71, 0xd7, 0xcc, 0xc2, 0xce, 0xa0, 0x35, 0x43, 0x15, 0x27, 0xb1,
0xd1, 0xf8, 0x7b, 0xf7, 0xb6, 0xec, 0x90, 0x48, 0x27, 0xbf, 0xbb, 0x30, 0x3c, 0xa7, 0x73, 0xf1, 0x8a, 0xfd, 0xba, 0xe9, 0xa0, 0x5f, 0xbe, 0xa5, 0xf0, 0x82, 0x60, 0x5f, 0x53, 0x25, 0x97, 0x51,
0x09, 0xa2, 0x90, 0x06, 0xf1, 0x6c, 0xb3, 0xe7, 0x6e, 0x98, 0x92, 0xe7, 0xad, 0x75, 0x72, 0x5c, 0xc1, 0x0a, 0x4e, 0xa0, 0xbd, 0x96, 0x62, 0x5d, 0x70, 0x6f, 0x71, 0x49, 0xef, 0xd2, 0x3f, 0xf5,
0xee, 0x78, 0xad, 0x10, 0xdf, 0x26, 0xad, 0xbb, 0xe9, 0x6f, 0xd2, 0xba, 0x97, 0x7b, 0xb9, 0x23, 0x26, 0x1f, 0xe2, 0xbb, 0x05, 0xd2, 0xb3, 0x6c, 0xf0, 0xb9, 0xf6, 0xc9, 0xe1, 0xc7, 0xd0, 0x24,
0x3e, 0xc3, 0x80, 0x82, 0x22, 0x1a, 0xc8, 0xf7, 0x22, 0x98, 0x8c, 0xdb, 0x09, 0xb5, 0xdc, 0x05, 0xaf, 0xe9, 0x3d, 0x6a, 0x1f, 0x6c, 0xdf, 0xa3, 0xf1, 0x8a, 0xc1, 0xf0, 0x21, 0x78, 0xe7, 0x77,
0xec, 0x56, 0x11, 0x11, 0x2f, 0x36, 0xf9, 0x6b, 0x89, 0x4a, 0xe4, 0x36, 0x4a, 0x25, 0x7a, 0x39, 0xc2, 0x2e, 0xff, 0x9f, 0x49, 0x3f, 0xa1, 0xae, 0xad, 0xf0, 0x12, 0x8e, 0x76, 0xf0, 0x1c, 0x51,
0x08, 0x7f, 0x99, 0xe9, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x07, 0x84, 0x80, 0x26, 0xe5, 0x04, 0xea, 0x81, 0xba, 0x15, 0xee, 0xb2, 0xa0, 0xa3, 0xdf, 0x2e, 0x34, 0x2f, 0x69, 0xb0, 0x57, 0x4f,
0x00, 0x00, 0x9d, 0xf5, 0x36, 0x59, 0xeb, 0x07, 0x1a, 0xbc, 0xaf, 0x40, 0xd0, 0x09, 0xee, 0xb0, 0xef, 0xe0,
0x19, 0xe7, 0xb3, 0xb7, 0x9b, 0xe8, 0xd5, 0xc3, 0x09, 0xde, 0x6d, 0xcd, 0xaf, 0x6a, 0x99, 0x53,
0x2d, 0xd3, 0x5a, 0xbd, 0xf4, 0x32, 0xad, 0xb5, 0x1b, 0xe7, 0x3b, 0xec, 0x02, 0x1a, 0xf6, 0x28,
0x58, 0x09, 0x78, 0xed, 0xdc, 0x82, 0xde, 0x76, 0x40, 0x21, 0x77, 0x0d, 0xad, 0xfc, 0x1c, 0x58,
0xc9, 0x5c, 0x9e, 0x5d, 0x4f, 0xc0, 0xab, 0x20, 0xb9, 0xe8, 0x4d, 0xc3, 0xfc, 0x49, 0x0f, 0xff,
0x06, 0x00, 0x00, 0xff, 0xff, 0x79, 0x8a, 0x5f, 0xf0, 0x24, 0x06, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -654,6 +750,8 @@ const _ = grpc.SupportPackageIsVersion4
// //
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type NetworkClient interface { type NetworkClient interface {
// Connect to the network
Connect(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (*ConnectResponse, error)
// Returns the entire network graph // Returns the entire network graph
Graph(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error) Graph(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error)
// Returns a list of known nodes in the network // Returns a list of known nodes in the network
@ -672,6 +770,15 @@ func NewNetworkClient(cc *grpc.ClientConn) NetworkClient {
return &networkClient{cc} return &networkClient{cc}
} }
func (c *networkClient) Connect(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (*ConnectResponse, error) {
out := new(ConnectResponse)
err := c.cc.Invoke(ctx, "/go.micro.network.Network/Connect", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *networkClient) Graph(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error) { func (c *networkClient) Graph(ctx context.Context, in *GraphRequest, opts ...grpc.CallOption) (*GraphResponse, error) {
out := new(GraphResponse) out := new(GraphResponse)
err := c.cc.Invoke(ctx, "/go.micro.network.Network/Graph", in, out, opts...) err := c.cc.Invoke(ctx, "/go.micro.network.Network/Graph", in, out, opts...)
@ -710,6 +817,8 @@ func (c *networkClient) Services(ctx context.Context, in *ServicesRequest, opts
// NetworkServer is the server API for Network service. // NetworkServer is the server API for Network service.
type NetworkServer interface { type NetworkServer interface {
// Connect to the network
Connect(context.Context, *ConnectRequest) (*ConnectResponse, error)
// Returns the entire network graph // Returns the entire network graph
Graph(context.Context, *GraphRequest) (*GraphResponse, error) Graph(context.Context, *GraphRequest) (*GraphResponse, error)
// Returns a list of known nodes in the network // Returns a list of known nodes in the network
@ -724,6 +833,24 @@ func RegisterNetworkServer(s *grpc.Server, srv NetworkServer) {
s.RegisterService(&_Network_serviceDesc, srv) s.RegisterService(&_Network_serviceDesc, srv)
} }
func _Network_Connect_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ConnectRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(NetworkServer).Connect(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.network.Network/Connect",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(NetworkServer).Connect(ctx, req.(*ConnectRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Network_Graph_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Network_Graph_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GraphRequest) in := new(GraphRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -800,6 +927,10 @@ var _Network_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.network.Network", ServiceName: "go.micro.network.Network",
HandlerType: (*NetworkServer)(nil), HandlerType: (*NetworkServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{
MethodName: "Connect",
Handler: _Network_Connect_Handler,
},
{ {
MethodName: "Graph", MethodName: "Graph",
Handler: _Network_Graph_Handler, Handler: _Network_Graph_Handler,

View File

@ -6,6 +6,8 @@ import "github.com/micro/go-micro/router/proto/router.proto";
// Network service is usesd to gain visibility into networks // Network service is usesd to gain visibility into networks
service Network { service Network {
// Connect to the network
rpc Connect(ConnectRequest) returns (ConnectResponse) {};
// Returns the entire network graph // Returns the entire network graph
rpc Graph(GraphRequest) returns (GraphResponse) {}; rpc Graph(GraphRequest) returns (GraphResponse) {};
// Returns a list of known nodes in the network // Returns a list of known nodes in the network
@ -25,6 +27,12 @@ message Query {
string network = 5; string network = 5;
} }
message ConnectRequest {
repeated Node nodes = 1;
}
message ConnectResponse {}
// PeerRequest requests list of peers // PeerRequest requests list of peers
message NodesRequest { message NodesRequest {
// node topology depth // node topology depth
@ -67,6 +75,10 @@ message Node {
string id = 1; string id = 1;
// node address // node address
string address = 2; string address = 2;
// the network
string network = 3;
// associated metadata
map<string,string> metadata = 4;
} }
// Connect is sent when the node connects to the network // Connect is sent when the node connects to the network

View File

@ -18,6 +18,10 @@ func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
port = "8085" port = "8085"
} }
if len(host) == 0 {
host = "localhost"
}
addrs, err := net.LookupHost(host) addrs, err := net.LookupHost(host)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -9,6 +9,7 @@ import (
pbNet "github.com/micro/go-micro/network/proto" pbNet "github.com/micro/go-micro/network/proto"
"github.com/micro/go-micro/router" "github.com/micro/go-micro/router"
pbRtr "github.com/micro/go-micro/router/proto" pbRtr "github.com/micro/go-micro/router/proto"
"github.com/micro/go-micro/util/log"
) )
// Network implements network handler // Network implements network handler
@ -47,6 +48,50 @@ func flatten(n network.Node, visited map[string]bool) []network.Node {
return nodes return nodes
} }
func (n *Network) Connect(ctx context.Context, req *pbNet.ConnectRequest, resp *pbNet.ConnectResponse) error {
if len(req.Nodes) == 0 {
return nil
}
// get list of existing nodes
nodes := n.Network.Options().Peers
// generate a node map
nodeMap := make(map[string]bool)
for _, node := range nodes {
nodeMap[node] = true
}
for _, node := range req.Nodes {
// TODO: we may have been provided a network only
// so process anad resolve node.Network
if len(node.Address) == 0 {
continue
}
// already exists
if _, ok := nodeMap[node.Address]; ok {
continue
}
nodeMap[node.Address] = true
nodes = append(nodes, node.Address)
}
log.Infof("Network.Connect setting peers: %v", nodes)
// reinitialise the peers
n.Network.Init(
network.Peers(nodes...),
)
// call the connect method
n.Network.Connect()
return nil
}
// Nodes returns the list of nodes // Nodes returns the list of nodes
func (n *Network) Nodes(ctx context.Context, req *pbNet.NodesRequest, resp *pbNet.NodesResponse) error { func (n *Network) Nodes(ctx context.Context, req *pbNet.NodesRequest, resp *pbNet.NodesResponse) error {
depth := uint(req.Depth) depth := uint(req.Depth)

View File

@ -777,6 +777,30 @@ func (t *tun) setupLink(node string) (*link, error) {
return link, nil return link, nil
} }
func (t *tun) setupLinks() {
for _, node := range t.options.Nodes {
// skip zero length nodes
if len(node) == 0 {
continue
}
// link already exists
if _, ok := t.links[node]; ok {
continue
}
// connect to node and return link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to establish node link to %s: %v", node, err)
continue
}
// save the link
t.links[node] = link
}
}
// connect the tunnel to all the nodes and listen for incoming tunnel connections // connect the tunnel to all the nodes and listen for incoming tunnel connections
func (t *tun) connect() error { func (t *tun) connect() error {
l, err := t.options.Transport.Listen(t.options.Address) l, err := t.options.Transport.Listen(t.options.Address)
@ -816,22 +840,8 @@ func (t *tun) connect() error {
} }
}() }()
for _, node := range t.options.Nodes { // setup links
// skip zero length nodes t.setupLinks()
if len(node) == 0 {
continue
}
// connect to node and return link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to establish node link to %s: %v", node, err)
continue
}
// save the link
t.links[node] = link
}
// process outbound messages to be sent // process outbound messages to be sent
// process sends to all links // process sends to all links
@ -850,6 +860,8 @@ func (t *tun) Connect() error {
// already connected // already connected
if t.connected { if t.connected {
// setup links
t.setupLinks()
return nil return nil
} }