Added Status method to network.Node fixed random segfaults.

This commit is contained in:
Milos Gajdos 2020-01-14 18:12:36 +00:00
parent 994d371ff1
commit 821fda41ae
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
7 changed files with 432 additions and 84 deletions

View File

@ -165,6 +165,7 @@ func newNetwork(opts ...Option) Network {
id: options.Id, id: options.Id,
address: peerAddress, address: peerAddress,
peers: make(map[string]*node), peers: make(map[string]*node),
status: newStatus(),
}, },
options: options, options: options,
router: options.Router, router: options.Router,
@ -780,6 +781,7 @@ func (n *network) processNetChan(listener tunnel.Listener) {
address: pbNetConnect.Node.Address, address: pbNetConnect.Node.Address,
link: m.msg.Header["Micro-Link"], link: m.msg.Header["Micro-Link"],
peers: make(map[string]*node), peers: make(map[string]*node),
status: newStatus(),
lastSeen: now, lastSeen: now,
} }
@ -878,6 +880,12 @@ func (n *network) processNetChan(listener tunnel.Listener) {
address: pbNetPeer.Node.Address, address: pbNetPeer.Node.Address,
link: m.msg.Header["Micro-Link"], link: m.msg.Header["Micro-Link"],
peers: make(map[string]*node), peers: make(map[string]*node),
status: &status{
err: &nerr{
count: int(pbNetPeer.Node.Status.Error.Count),
msg: errors.New(pbNetPeer.Node.Status.Error.Msg),
},
},
lastSeen: now, lastSeen: now,
} }
@ -977,6 +985,12 @@ func (n *network) processNetChan(listener tunnel.Listener) {
address: pbNetSync.Peer.Node.Address, address: pbNetSync.Peer.Node.Address,
link: m.msg.Header["Micro-Link"], link: m.msg.Header["Micro-Link"],
peers: make(map[string]*node), peers: make(map[string]*node),
status: &status{
err: &nerr{
count: int(pbNetSync.Peer.Node.Status.Error.Count),
msg: errors.New(pbNetSync.Peer.Node.Status.Error.Msg),
},
},
lastSeen: now, lastSeen: now,
} }
@ -1328,13 +1342,19 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message)
if err != nil { if err != nil {
return err return err
} }
// Create a unicast connection to the peer but don't do the open/accept flow // Create a unicast connection to the peer but don't do the open/accept flow
c, err := n.tunnel.Dial(channel, tunnel.DialWait(false), tunnel.DialLink(peer.link)) c, err := n.tunnel.Dial(channel, tunnel.DialWait(false), tunnel.DialLink(peer.link))
if err != nil { if err != nil {
// increment the peer error count; prune peer if we exceed MaxPeerErrors if peerNode := n.GetPeerNode(peer.id); peerNode != nil {
peer.err.Increment() log.Debugf("Network found peer %s: %v", peer.id, peerNode)
if count := peer.err.GetCount(); count == MaxPeerErrors { // update node status when error happens
n.PrunePeer(peer.id) peerNode.status.err.Update(err)
log.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count())
if count := peerNode.status.Error().Count(); count == MaxPeerErrors {
log.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count())
n.PrunePeer(peerNode.id)
}
} }
return err return err
} }
@ -1361,10 +1381,16 @@ func (n *network) sendTo(method, channel string, peer *node, msg proto.Message)
} }
if err := c.Send(tmsg); err != nil { if err := c.Send(tmsg); err != nil {
// increment the peer error count; prune peer if we exceed MaxPeerErrors // TODO: Lookup peer in our graph
peer.err.Increment() if peerNode := n.GetPeerNode(peer.id); peerNode != nil {
if count := peer.err.GetCount(); count == MaxPeerErrors { log.Debugf("Network found peer %s: %v", peer.id, peerNode)
n.PrunePeer(peer.id) // update node status when error happens
peerNode.status.err.Update(err)
log.Debugf("Network increment node peer %p %v count to: %d", peerNode, peerNode, peerNode.status.Error().Count())
if count := peerNode.status.Error().Count(); count == MaxPeerErrors {
log.Debugf("Network node peer %v count exceeded %d: %d", peerNode, MaxPeerErrors, peerNode.status.Error().Count())
n.PrunePeer(peerNode.id)
}
} }
return err return err
} }

View File

@ -26,6 +26,20 @@ var (
PruneTime = 90 * time.Second PruneTime = 90 * time.Second
) )
// Error is network node errors
type Error interface {
// Count is current count of errors
Count() int
// Msg is last error message
Msg() string
}
// Status is node status
type Status interface {
// Error reports error status
Error() Error
}
// Node is network node // Node is network node
type Node interface { type Node interface {
// Id is node id // Id is node id
@ -36,6 +50,8 @@ type Node interface {
Peers() []Node Peers() []Node
// Network is the network node is in // Network is the network node is in
Network() Network Network() Network
// Status returns node status
Status() Status
} }
// Network is micro network // Network is micro network

View File

@ -21,33 +21,62 @@ var (
ErrPeerNotFound = errors.New("peer not found") ErrPeerNotFound = errors.New("peer not found")
) )
type nodeError struct { // nerr tracks node errors
type nerr struct {
sync.RWMutex sync.RWMutex
count int count int
msg error
} }
// Increment increments node error count // Increment increments node error count
func (n *nodeError) Increment() { func (e *nerr) Update(err error) {
n.Lock() e.Lock()
defer n.Unlock() defer e.Unlock()
n.count++ e.count++
e.msg = err
} }
// Reset reset node error count // Count returns node error count
func (n *nodeError) Reset() { func (e *nerr) Count() int {
n.Lock() e.RLock()
defer n.Unlock() defer e.RUnlock()
n.count = 0 return e.count
} }
// GetCount returns node error count func (e *nerr) Msg() string {
func (n *nodeError) GetCount() int { e.RLock()
n.RLock() defer e.RUnlock()
defer n.RUnlock()
return n.count if e.msg != nil {
return e.msg.Error()
}
return ""
}
// status returns node status
type status struct {
sync.RWMutex
err *nerr
}
// newStatus creates
func newStatus() *status {
return &status{
err: new(nerr),
}
}
func (s *status) Error() Error {
s.RLock()
defer s.RUnlock()
return &nerr{
count: s.err.count,
msg: s.err.msg,
}
} }
// node is network node // node is network node
@ -67,8 +96,8 @@ type node struct {
lastSeen time.Time lastSeen time.Time
// lastSync keeps track of node last sync request // lastSync keeps track of node last sync request
lastSync time.Time lastSync time.Time
// err tracks node errors // err tracks node status
err nodeError status *status
} }
// Id is node ide // Id is node ide
@ -86,6 +115,19 @@ func (n *node) Network() Network {
return n.network return n.network
} }
// Status returns node status
func (n *node) Status() Status {
n.RLock()
defer n.RUnlock()
return &status{
err: &nerr{
count: n.status.err.count,
msg: n.status.err.msg,
},
}
}
// walk walks the node graph until some condition is met // walk walks the node graph until some condition is met
func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)) map[string]*node { func (n *node) walk(until func(peer *node) bool, action func(parent, peer *node)) map[string]*node {
// track the visited nodes // track the visited nodes
@ -127,7 +169,28 @@ func (n *node) AddPeer(peer *node) error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
// get node topology: we need to check if the peer
// we are trying to add is already in our graph
top := n.getTopology(MaxDepth)
untilFoundPeer := func(n *node) bool {
return n.id == peer.id
}
justWalk := func(paent, node *node) {}
visited := top.walk(untilFoundPeer, justWalk)
peerNode, inTop := visited[peer.id]
if _, ok := n.peers[peer.id]; !ok { if _, ok := n.peers[peer.id]; !ok {
if inTop {
// just create a new edge to the existing peer
// but make sure you update the peer link
peerNode.link = peer.link
n.peers[peer.id] = peerNode
return nil
}
n.peers[peer.id] = peer n.peers[peer.id] = peer
return nil return nil
} }
@ -310,6 +373,7 @@ func (n *node) getTopology(depth uint) *node {
address: n.address, address: n.address,
peers: make(map[string]*node), peers: make(map[string]*node),
network: n.network, network: n.network,
status: n.status,
lastSeen: n.lastSeen, lastSeen: n.lastSeen,
} }
@ -361,6 +425,12 @@ func UnpackPeerTopology(pbPeer *pb.Peer, lastSeen time.Time, depth uint) *node {
id: pbPeer.Node.Id, id: pbPeer.Node.Id,
address: pbPeer.Node.Address, address: pbPeer.Node.Address,
peers: make(map[string]*node), peers: make(map[string]*node),
status: &status{
err: &nerr{
count: int(pbPeer.Node.Status.Error.Count),
msg: errors.New(pbPeer.Node.Status.Error.Msg),
},
},
lastSeen: lastSeen, lastSeen: lastSeen,
} }
@ -387,6 +457,12 @@ func peerProtoTopology(peer Node, depth uint) *pb.Peer {
node := &pb.Node{ node := &pb.Node{
Id: peer.Id(), Id: peer.Id(),
Address: peer.Address(), Address: peer.Address(),
Status: &pb.Status{
Error: &pb.Error{
Count: uint32(peer.Status().Error().Count()),
Msg: peer.Status().Error().Msg(),
},
},
} }
// set the network name if network is not nil // set the network name if network is not nil
@ -422,6 +498,12 @@ func PeersToProto(node Node, depth uint) *pb.Peer {
pbNode := &pb.Node{ pbNode := &pb.Node{
Id: node.Id(), Id: node.Id(),
Address: node.Address(), Address: node.Address(),
Status: &pb.Status{
Error: &pb.Error{
Count: uint32(node.Status().Error().Count()),
Msg: node.Status().Error().Msg(),
},
},
} }
// set the network name if network is not nil // set the network name if network is not nil

View File

@ -21,6 +21,7 @@ func testSetup() *node {
address: testNodeAddress, address: testNodeAddress,
peers: make(map[string]*node), peers: make(map[string]*node),
network: newNetwork(Name(testNodeNetName)), network: newNetwork(Name(testNodeNetName)),
err: new(nodeError),
} }
// add some peers to the node // add some peers to the node
@ -30,6 +31,7 @@ func testSetup() *node {
address: testNode.address + "-" + id, address: testNode.address + "-" + id,
peers: make(map[string]*node), peers: make(map[string]*node),
network: testNode.network, network: testNode.network,
err: new(nodeError),
} }
} }
@ -41,6 +43,7 @@ func testSetup() *node {
address: testNode.address + "-" + id, address: testNode.address + "-" + id,
peers: make(map[string]*node), peers: make(map[string]*node),
network: testNode.network, network: testNode.network,
err: new(nodeError),
} }
} }
@ -269,6 +272,7 @@ func TestUnpackPeerTopology(t *testing.T) {
Node: &pb.Node{ Node: &pb.Node{
Id: "newPeer", Id: "newPeer",
Address: "newPeerAddress", Address: "newPeerAddress",
Err: &pb.NodeError{},
}, },
Peers: make([]*pb.Peer, 0), Peers: make([]*pb.Peer, 0),
} }
@ -284,12 +288,14 @@ func TestUnpackPeerTopology(t *testing.T) {
pbPeer1Node := &pb.Node{ pbPeer1Node := &pb.Node{
Id: peer1.id, Id: peer1.id,
Address: peer1.address, Address: peer1.address,
Err: &pb.NodeError{},
} }
pbPeer111 := &pb.Peer{ pbPeer111 := &pb.Peer{
Node: &pb.Node{ Node: &pb.Node{
Id: "peer111", Id: "peer111",
Address: "peer111Address", Address: "peer111Address",
Err: &pb.NodeError{},
}, },
Peers: make([]*pb.Peer, 0), Peers: make([]*pb.Peer, 0),
} }
@ -298,6 +304,7 @@ func TestUnpackPeerTopology(t *testing.T) {
Node: &pb.Node{ Node: &pb.Node{
Id: "peer121", Id: "peer121",
Address: "peer121Address", Address: "peer121Address",
Err: &pb.NodeError{},
}, },
Peers: make([]*pb.Peer, 0), Peers: make([]*pb.Peer, 0),
} }
@ -324,6 +331,7 @@ func TestPeersToProto(t *testing.T) {
address: testNodeAddress, address: testNodeAddress,
peers: make(map[string]*node), peers: make(map[string]*node),
network: newNetwork(Name(testNodeNetName)), network: newNetwork(Name(testNodeNetName)),
err: &nodeError{},
} }
topCount := 0 topCount := 0

View File

@ -473,6 +473,164 @@ func (m *ServicesResponse) GetServices() []string {
return nil return nil
} }
type StatusRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatusRequest) Reset() { *m = StatusRequest{} }
func (m *StatusRequest) String() string { return proto.CompactTextString(m) }
func (*StatusRequest) ProtoMessage() {}
func (*StatusRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{11}
}
func (m *StatusRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatusRequest.Unmarshal(m, b)
}
func (m *StatusRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatusRequest.Marshal(b, m, deterministic)
}
func (m *StatusRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatusRequest.Merge(m, src)
}
func (m *StatusRequest) XXX_Size() int {
return xxx_messageInfo_StatusRequest.Size(m)
}
func (m *StatusRequest) XXX_DiscardUnknown() {
xxx_messageInfo_StatusRequest.DiscardUnknown(m)
}
var xxx_messageInfo_StatusRequest proto.InternalMessageInfo
type StatusResponse struct {
Status *Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatusResponse) Reset() { *m = StatusResponse{} }
func (m *StatusResponse) String() string { return proto.CompactTextString(m) }
func (*StatusResponse) ProtoMessage() {}
func (*StatusResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{12}
}
func (m *StatusResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatusResponse.Unmarshal(m, b)
}
func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic)
}
func (m *StatusResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatusResponse.Merge(m, src)
}
func (m *StatusResponse) XXX_Size() int {
return xxx_messageInfo_StatusResponse.Size(m)
}
func (m *StatusResponse) XXX_DiscardUnknown() {
xxx_messageInfo_StatusResponse.DiscardUnknown(m)
}
var xxx_messageInfo_StatusResponse proto.InternalMessageInfo
func (m *StatusResponse) GetStatus() *Status {
if m != nil {
return m.Status
}
return nil
}
// Error tracks network errors
type Error struct {
Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
Msg string `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Error) Reset() { *m = Error{} }
func (m *Error) String() string { return proto.CompactTextString(m) }
func (*Error) ProtoMessage() {}
func (*Error) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{13}
}
func (m *Error) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Error.Unmarshal(m, b)
}
func (m *Error) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Error.Marshal(b, m, deterministic)
}
func (m *Error) XXX_Merge(src proto.Message) {
xxx_messageInfo_Error.Merge(m, src)
}
func (m *Error) XXX_Size() int {
return xxx_messageInfo_Error.Size(m)
}
func (m *Error) XXX_DiscardUnknown() {
xxx_messageInfo_Error.DiscardUnknown(m)
}
var xxx_messageInfo_Error proto.InternalMessageInfo
func (m *Error) GetCount() uint32 {
if m != nil {
return m.Count
}
return 0
}
func (m *Error) GetMsg() string {
if m != nil {
return m.Msg
}
return ""
}
// Status is node status
type Status struct {
Error *Error `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Status) Reset() { *m = Status{} }
func (m *Status) String() string { return proto.CompactTextString(m) }
func (*Status) ProtoMessage() {}
func (*Status) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{14}
}
func (m *Status) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Status.Unmarshal(m, b)
}
func (m *Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Status.Marshal(b, m, deterministic)
}
func (m *Status) XXX_Merge(src proto.Message) {
xxx_messageInfo_Status.Merge(m, src)
}
func (m *Status) XXX_Size() int {
return xxx_messageInfo_Status.Size(m)
}
func (m *Status) XXX_DiscardUnknown() {
xxx_messageInfo_Status.DiscardUnknown(m)
}
var xxx_messageInfo_Status proto.InternalMessageInfo
func (m *Status) GetError() *Error {
if m != nil {
return m.Error
}
return nil
}
// Node is network node // Node is network node
type Node struct { type Node struct {
// node id // node id
@ -483,6 +641,8 @@ type Node struct {
Network string `protobuf:"bytes,3,opt,name=network,proto3" json:"network,omitempty"` Network string `protobuf:"bytes,3,opt,name=network,proto3" json:"network,omitempty"`
// associated metadata // associated metadata
Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// node status
Status *Status `protobuf:"bytes,5,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -492,7 +652,7 @@ func (m *Node) Reset() { *m = Node{} }
func (m *Node) String() string { return proto.CompactTextString(m) } func (m *Node) String() string { return proto.CompactTextString(m) }
func (*Node) ProtoMessage() {} func (*Node) ProtoMessage() {}
func (*Node) Descriptor() ([]byte, []int) { func (*Node) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{11} return fileDescriptor_8571034d60397816, []int{15}
} }
func (m *Node) XXX_Unmarshal(b []byte) error { func (m *Node) XXX_Unmarshal(b []byte) error {
@ -541,6 +701,13 @@ func (m *Node) GetMetadata() map[string]string {
return nil return nil
} }
func (m *Node) GetStatus() *Status {
if m != nil {
return m.Status
}
return nil
}
// Connect is sent when the node connects to the network // Connect is sent when the node connects to the network
type Connect struct { type Connect struct {
// network mode // network mode
@ -554,7 +721,7 @@ func (m *Connect) Reset() { *m = Connect{} }
func (m *Connect) String() string { return proto.CompactTextString(m) } func (m *Connect) String() string { return proto.CompactTextString(m) }
func (*Connect) ProtoMessage() {} func (*Connect) ProtoMessage() {}
func (*Connect) Descriptor() ([]byte, []int) { func (*Connect) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{12} return fileDescriptor_8571034d60397816, []int{16}
} }
func (m *Connect) XXX_Unmarshal(b []byte) error { func (m *Connect) XXX_Unmarshal(b []byte) error {
@ -595,7 +762,7 @@ func (m *Close) Reset() { *m = Close{} }
func (m *Close) String() string { return proto.CompactTextString(m) } func (m *Close) String() string { return proto.CompactTextString(m) }
func (*Close) ProtoMessage() {} func (*Close) ProtoMessage() {}
func (*Close) Descriptor() ([]byte, []int) { func (*Close) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{13} return fileDescriptor_8571034d60397816, []int{17}
} }
func (m *Close) XXX_Unmarshal(b []byte) error { func (m *Close) XXX_Unmarshal(b []byte) error {
@ -638,7 +805,7 @@ func (m *Peer) Reset() { *m = Peer{} }
func (m *Peer) String() string { return proto.CompactTextString(m) } func (m *Peer) String() string { return proto.CompactTextString(m) }
func (*Peer) ProtoMessage() {} func (*Peer) ProtoMessage() {}
func (*Peer) Descriptor() ([]byte, []int) { func (*Peer) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{14} return fileDescriptor_8571034d60397816, []int{18}
} }
func (m *Peer) XXX_Unmarshal(b []byte) error { func (m *Peer) XXX_Unmarshal(b []byte) error {
@ -688,7 +855,7 @@ func (m *Sync) Reset() { *m = Sync{} }
func (m *Sync) String() string { return proto.CompactTextString(m) } func (m *Sync) String() string { return proto.CompactTextString(m) }
func (*Sync) ProtoMessage() {} func (*Sync) ProtoMessage() {}
func (*Sync) Descriptor() ([]byte, []int) { func (*Sync) Descriptor() ([]byte, []int) {
return fileDescriptor_8571034d60397816, []int{15} return fileDescriptor_8571034d60397816, []int{19}
} }
func (m *Sync) XXX_Unmarshal(b []byte) error { func (m *Sync) XXX_Unmarshal(b []byte) error {
@ -735,6 +902,10 @@ func init() {
proto.RegisterType((*RoutesResponse)(nil), "go.micro.network.RoutesResponse") proto.RegisterType((*RoutesResponse)(nil), "go.micro.network.RoutesResponse")
proto.RegisterType((*ServicesRequest)(nil), "go.micro.network.ServicesRequest") proto.RegisterType((*ServicesRequest)(nil), "go.micro.network.ServicesRequest")
proto.RegisterType((*ServicesResponse)(nil), "go.micro.network.ServicesResponse") proto.RegisterType((*ServicesResponse)(nil), "go.micro.network.ServicesResponse")
proto.RegisterType((*StatusRequest)(nil), "go.micro.network.StatusRequest")
proto.RegisterType((*StatusResponse)(nil), "go.micro.network.StatusResponse")
proto.RegisterType((*Error)(nil), "go.micro.network.Error")
proto.RegisterType((*Status)(nil), "go.micro.network.Status")
proto.RegisterType((*Node)(nil), "go.micro.network.Node") proto.RegisterType((*Node)(nil), "go.micro.network.Node")
proto.RegisterMapType((map[string]string)(nil), "go.micro.network.Node.MetadataEntry") proto.RegisterMapType((map[string]string)(nil), "go.micro.network.Node.MetadataEntry")
proto.RegisterType((*Connect)(nil), "go.micro.network.Connect") proto.RegisterType((*Connect)(nil), "go.micro.network.Connect")
@ -746,43 +917,48 @@ func init() {
func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) } func init() { proto.RegisterFile("network.proto", fileDescriptor_8571034d60397816) }
var fileDescriptor_8571034d60397816 = []byte{ var fileDescriptor_8571034d60397816 = []byte{
// 593 bytes of a gzipped FileDescriptorProto // 678 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0x5d, 0x6a, 0xdb, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xdd, 0x4e, 0xdb, 0x30,
0x10, 0x8e, 0x2c, 0x29, 0x3f, 0xd3, 0xc8, 0x75, 0x97, 0x92, 0x0a, 0x3d, 0xb4, 0xee, 0xe2, 0x87, 0x14, 0xa6, 0x6d, 0x52, 0xe0, 0x8c, 0x14, 0x66, 0x4d, 0x2c, 0xca, 0xc5, 0xe8, 0x2c, 0x2e, 0xd0,
0x50, 0x1a, 0x19, 0x12, 0x0a, 0xa5, 0xa6, 0x21, 0x10, 0x4a, 0xa1, 0x90, 0x90, 0xca, 0x17, 0xa8, 0x34, 0xd2, 0x09, 0x34, 0x6d, 0x1a, 0x1a, 0x42, 0x43, 0x68, 0xd2, 0x24, 0x10, 0x4b, 0x5f, 0x60,
0x6c, 0x0d, 0xb6, 0x49, 0xac, 0x75, 0x56, 0xeb, 0x04, 0x9f, 0xa0, 0x47, 0xe8, 0x99, 0x7a, 0xab, 0x21, 0xb1, 0x4a, 0x05, 0x8d, 0x8b, 0xe3, 0x80, 0xfa, 0x04, 0x7b, 0xd3, 0xbd, 0xc4, 0x6e, 0x26,
0xb2, 0xbb, 0x23, 0xc7, 0x8e, 0x65, 0x37, 0x79, 0xd3, 0xec, 0x7c, 0xdf, 0xcc, 0xce, 0xcc, 0x37, 0xdb, 0x27, 0x21, 0xa1, 0x49, 0x57, 0xee, 0x72, 0xec, 0xef, 0x3b, 0xc7, 0xe7, 0xef, 0x0b, 0x38,
0x2b, 0x08, 0x72, 0x54, 0xf7, 0x42, 0x5e, 0xc7, 0x13, 0x29, 0x94, 0x60, 0x8d, 0x81, 0x88, 0xc7, 0x09, 0x93, 0x0f, 0x5c, 0xdc, 0xf8, 0x53, 0xc1, 0x25, 0x27, 0x5b, 0x23, 0xee, 0x4f, 0xc6, 0x91,
0xa3, 0xbe, 0x14, 0x31, 0x9d, 0x47, 0x9d, 0xc1, 0x48, 0x0d, 0xa7, 0xbd, 0xb8, 0x2f, 0xc6, 0x6d, 0xe0, 0x3e, 0x9e, 0x7b, 0x47, 0xa3, 0xb1, 0xbc, 0xce, 0xae, 0xfc, 0x88, 0x4f, 0x06, 0xfa, 0x66,
0xe3, 0x69, 0x0f, 0xc4, 0x91, 0xfd, 0x90, 0x62, 0xaa, 0x50, 0xb6, 0x0b, 0x94, 0x77, 0xa3, 0x3e, 0x30, 0xe2, 0xfb, 0xe6, 0x43, 0xf0, 0x4c, 0x32, 0x31, 0x48, 0x99, 0xb8, 0x1f, 0x47, 0x6c, 0xa0,
0xb6, 0x4d, 0x04, 0x3a, 0xb4, 0xe1, 0xf8, 0x6f, 0x07, 0xfc, 0x9f, 0x53, 0x94, 0x33, 0x16, 0xc2, 0x3d, 0xe0, 0xa1, 0x71, 0x47, 0x7f, 0xb7, 0xc0, 0xfe, 0x99, 0x31, 0x31, 0x23, 0x2e, 0xac, 0x22,
0x0e, 0xe1, 0x42, 0xa7, 0xe9, 0x1c, 0xee, 0x25, 0xa5, 0xa9, 0x3d, 0x69, 0x96, 0x49, 0x2c, 0x8a, 0xce, 0x6d, 0xf5, 0x5b, 0x7b, 0xeb, 0x41, 0x6e, 0xaa, 0x9b, 0x30, 0x8e, 0x05, 0x4b, 0x53, 0xb7,
0xb0, 0x66, 0x3d, 0x64, 0x6a, 0xcf, 0x20, 0x55, 0x78, 0x9f, 0xce, 0x42, 0xd7, 0x7a, 0xc8, 0x64, 0x6d, 0x6e, 0xd0, 0x54, 0x37, 0xa3, 0x50, 0xb2, 0x87, 0x70, 0xe6, 0x76, 0xcc, 0x0d, 0x9a, 0x64,
0x07, 0xb0, 0x6d, 0xf3, 0x84, 0x9e, 0x71, 0x90, 0xa5, 0x19, 0x74, 0xef, 0xd0, 0xb7, 0x0c, 0x32, 0x1b, 0xba, 0x26, 0x8e, 0x6b, 0xe9, 0x0b, 0xb4, 0x14, 0x03, 0xdf, 0xed, 0xda, 0x86, 0x81, 0x26,
0xf9, 0x29, 0xd4, 0xcf, 0x45, 0x9e, 0x63, 0x5f, 0x25, 0x78, 0x3b, 0xc5, 0x42, 0xb1, 0x8f, 0xe0, 0x3d, 0x86, 0xde, 0x29, 0x4f, 0x12, 0x16, 0xc9, 0x80, 0xdd, 0x65, 0x2c, 0x95, 0xe4, 0x3d, 0xd8,
0xe7, 0x22, 0xc3, 0x22, 0x74, 0x9a, 0xee, 0xe1, 0x8b, 0xe3, 0x83, 0xf8, 0x71, 0xe9, 0xf1, 0xa5, 0x09, 0x8f, 0x59, 0xea, 0xb6, 0xfa, 0x9d, 0xbd, 0x17, 0x07, 0xdb, 0xfe, 0xd3, 0xd4, 0xfd, 0x0b,
0xc8, 0x30, 0xb1, 0x20, 0xfe, 0x0a, 0x5e, 0xce, 0xf9, 0xc5, 0x44, 0xe4, 0x05, 0xf2, 0x16, 0xec, 0x1e, 0xb3, 0xc0, 0x80, 0xe8, 0x4b, 0xd8, 0x2c, 0xf8, 0xe9, 0x94, 0x27, 0x29, 0xa3, 0xbb, 0xb0,
0x6b, 0x44, 0x51, 0x06, 0x7c, 0x0d, 0x7e, 0x86, 0x13, 0x35, 0x34, 0x05, 0x06, 0x89, 0x35, 0xf8, 0xa1, 0x10, 0x69, 0xee, 0xf0, 0x15, 0xd8, 0x31, 0x9b, 0xca, 0x6b, 0x9d, 0xa0, 0x13, 0x18, 0x83,
0x57, 0x08, 0x08, 0x65, 0x69, 0xcf, 0xcc, 0xdb, 0x82, 0xfd, 0xef, 0x32, 0x9d, 0x0c, 0x37, 0x27, 0x7e, 0x05, 0x07, 0x51, 0x86, 0xf6, 0xcc, 0xb8, 0xbb, 0xb0, 0xf1, 0x5d, 0x84, 0xd3, 0xeb, 0xc5,
0xe9, 0x40, 0x40, 0x28, 0x4a, 0xf2, 0x01, 0x3c, 0x29, 0x84, 0x32, 0xa8, 0xca, 0x1c, 0x57, 0x88, 0x41, 0x8e, 0xc0, 0x41, 0x14, 0x06, 0x79, 0x07, 0x96, 0xe0, 0x5c, 0x6a, 0x54, 0x6d, 0x8c, 0x4b,
0x32, 0x31, 0x18, 0x7e, 0x0a, 0x41, 0xa2, 0xdb, 0x37, 0x2f, 0xe4, 0x08, 0xfc, 0x5b, 0x3d, 0x34, 0xc6, 0x44, 0xa0, 0x31, 0xf4, 0x18, 0x9c, 0x40, 0x95, 0xaf, 0x48, 0x64, 0x1f, 0xec, 0x3b, 0xd5,
0x62, 0xbf, 0x59, 0x65, 0x9b, 0x99, 0x26, 0x16, 0xc5, 0xcf, 0xa0, 0x5e, 0xf2, 0x29, 0x7b, 0x4c, 0x34, 0x64, 0xbf, 0x9e, 0x67, 0xeb, 0x9e, 0x06, 0x06, 0x45, 0x4f, 0xa0, 0x97, 0xf3, 0x31, 0xba,
0xe3, 0xa9, 0xa8, 0x91, 0xe4, 0x61, 0x08, 0x34, 0x36, 0xd3, 0xdc, 0xae, 0x55, 0x43, 0x79, 0x07, 0x8f, 0xed, 0xa9, 0xc9, 0x11, 0xc7, 0x43, 0x13, 0xb0, 0x6d, 0xba, 0xb8, 0x43, 0x33, 0x0d, 0xf9,
0x1e, 0x43, 0xe3, 0xe1, 0x88, 0xc2, 0x46, 0xb0, 0x4b, 0xa2, 0xb1, 0x81, 0xf7, 0x92, 0xb9, 0xcd, 0x1b, 0xa8, 0x0f, 0x5b, 0x8f, 0x47, 0xe8, 0xd6, 0x83, 0x35, 0x1c, 0x1a, 0xe3, 0x78, 0x3d, 0x28,
0xff, 0x3a, 0xe0, 0xe9, 0xbe, 0xb1, 0x3a, 0xd4, 0x46, 0x19, 0x69, 0xac, 0x36, 0xca, 0x36, 0xcb, 0x6c, 0xba, 0x09, 0xce, 0x50, 0x86, 0x32, 0x2b, 0x1c, 0x7c, 0x83, 0x5e, 0x7e, 0x80, 0xf4, 0x0f,
0xab, 0x14, 0x8b, 0xbb, 0x24, 0x16, 0x76, 0x06, 0xbb, 0x63, 0x54, 0x69, 0x96, 0xaa, 0x34, 0xf4, 0xd0, 0x4d, 0xf5, 0x09, 0xe6, 0xe5, 0xce, 0xe7, 0x85, 0x0c, 0xc4, 0xd1, 0x01, 0xd8, 0x67, 0x42,
0x4c, 0x05, 0xad, 0xea, 0x29, 0xc5, 0x17, 0x04, 0xfb, 0x96, 0x2b, 0x39, 0x4b, 0xe6, 0xac, 0xa8, 0x70, 0xa1, 0xaa, 0x1e, 0xf1, 0x2c, 0x91, 0x79, 0xd5, 0xb5, 0x41, 0xb6, 0xa0, 0x33, 0x49, 0x47,
0x03, 0xc1, 0x92, 0x8b, 0x35, 0xc0, 0xbd, 0xc6, 0x19, 0xdd, 0x4b, 0x7f, 0xea, 0x49, 0xde, 0xa5, 0x38, 0xb5, 0xea, 0x93, 0x7e, 0x82, 0xae, 0x71, 0xa1, 0x6a, 0xc8, 0x14, 0xb5, 0xb9, 0x86, 0xda,
0x37, 0x53, 0xa4, 0x6b, 0x59, 0xe3, 0x4b, 0xed, 0xb3, 0xc3, 0x3f, 0xc1, 0x0e, 0x69, 0x4d, 0xcf, 0x73, 0x60, 0x50, 0xf4, 0x6f, 0x0b, 0x2c, 0xd5, 0x76, 0xd2, 0x83, 0xf6, 0x38, 0xc6, 0x15, 0x69,
0x51, 0xeb, 0x60, 0xfd, 0x1c, 0x8d, 0x56, 0x0c, 0x86, 0x9f, 0x80, 0x7f, 0x7e, 0x23, 0xec, 0xf0, 0x8f, 0xe3, 0xc5, 0xdb, 0x91, 0xcf, 0x7a, 0xa7, 0x32, 0xeb, 0xe4, 0x04, 0xd6, 0x26, 0x4c, 0x86,
0x9f, 0x4c, 0xfa, 0x05, 0x9e, 0x96, 0xc2, 0x73, 0x38, 0x5a, 0xc1, 0x13, 0x44, 0xa9, 0x1b, 0xea, 0x71, 0x28, 0x43, 0xd7, 0xd2, 0x0d, 0xd8, 0xad, 0x1f, 0x32, 0xff, 0x1c, 0x61, 0x67, 0x89, 0x14,
0x6e, 0x50, 0x97, 0x05, 0xf1, 0x1e, 0x78, 0xdd, 0x59, 0xde, 0xd7, 0x19, 0xf4, 0xc1, 0xff, 0x24, 0xb3, 0xa0, 0x60, 0x95, 0x4a, 0x65, 0x2f, 0x57, 0x2a, 0xef, 0x08, 0x9c, 0x8a, 0x33, 0x55, 0x9c,
0xa9, 0x31, 0x0b, 0x02, 0xaa, 0x3d, 0x45, 0x40, 0xc7, 0x7f, 0x5c, 0xd8, 0xb9, 0xa4, 0xe1, 0x5d, 0x1b, 0x36, 0xc3, 0x4c, 0xd4, 0xa7, 0x2a, 0xe2, 0x7d, 0x78, 0x9b, 0x31, 0x4c, 0xc4, 0x18, 0x5f,
0x3d, 0x74, 0xaf, 0xb9, 0x9a, 0x64, 0xf9, 0x11, 0x88, 0xde, 0x6f, 0x40, 0xd0, 0x9a, 0x6f, 0xb1, 0xda, 0x9f, 0x5b, 0xf4, 0x23, 0xac, 0xe2, 0x72, 0xa9, 0xc1, 0x55, 0x83, 0xdf, 0x3c, 0xb8, 0x7a,
0x1f, 0xe0, 0x9b, 0xed, 0x62, 0x6f, 0x57, 0xd1, 0x8b, 0xcb, 0x19, 0xbd, 0x5b, 0xeb, 0x5f, 0x8c, 0x39, 0x34, 0x86, 0x1e, 0x82, 0x7d, 0x7a, 0xcb, 0xcd, 0xb4, 0x2f, 0x4d, 0xfa, 0x05, 0x96, 0x9a,
0x65, 0x9e, 0x83, 0xaa, 0x58, 0x8b, 0xaf, 0x49, 0x55, 0xac, 0xa5, 0x77, 0x84, 0x6f, 0xb1, 0x0b, 0xfd, 0xe7, 0x70, 0xd4, 0xca, 0x4e, 0x19, 0x13, 0xaa, 0x05, 0x9d, 0x05, 0xeb, 0x64, 0x40, 0xf4,
0xd8, 0xb6, 0x8b, 0xc7, 0x2a, 0xc0, 0x4b, 0x2b, 0x1d, 0x35, 0xd7, 0x03, 0xe6, 0xe1, 0xba, 0xb0, 0x0a, 0xac, 0xe1, 0x2c, 0x89, 0x54, 0x04, 0x75, 0xf0, 0xbf, 0x1d, 0x54, 0x98, 0xd2, 0xc6, 0xb4,
0x5b, 0xae, 0x1c, 0xab, 0xe8, 0xcb, 0xa3, 0x0d, 0x8d, 0xf8, 0x26, 0x48, 0x19, 0xb4, 0xb7, 0x6d, 0x97, 0xd9, 0x98, 0x83, 0x3f, 0x1d, 0x58, 0xbd, 0xc0, 0x76, 0x5f, 0x3e, 0x56, 0xaf, 0x3f, 0x1f,
0x7e, 0x04, 0x27, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x66, 0x66, 0x42, 0x5f, 0x68, 0x06, 0x00, 0xa4, 0xaa, 0x7a, 0xde, 0xdb, 0x05, 0x08, 0xd4, 0xb5, 0x15, 0xf2, 0x03, 0x6c, 0x2d, 0x27, 0xe4,
0x00, 0xcd, 0x3c, 0xba, 0xac, 0x46, 0xde, 0x4e, 0xe3, 0x7d, 0xd9, 0x97, 0xd6, 0xbf, 0x3a, 0x5f, 0x65,
0xf9, 0xac, 0xf3, 0x55, 0x11, 0x4e, 0xba, 0x42, 0xce, 0xa1, 0x6b, 0x94, 0x86, 0xd4, 0x80, 0x2b,
0x1a, 0xe6, 0xf5, 0x9b, 0x01, 0x85, 0xbb, 0x21, 0xac, 0xe5, 0x1a, 0x43, 0x6a, 0xea, 0xf2, 0x44,
0x92, 0x3c, 0xba, 0x08, 0x52, 0x7e, 0x23, 0x4a, 0xc0, 0x4e, 0xe3, 0xd2, 0x34, 0xbf, 0xb1, 0x2a,
0x59, 0x74, 0xe5, 0xaa, 0xab, 0x7f, 0xa4, 0x87, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x11, 0x45,
0xe6, 0xf4, 0xa8, 0x07, 0x00, 0x00,
} }

View File

@ -45,6 +45,8 @@ type NetworkService interface {
Routes(ctx context.Context, in *RoutesRequest, opts ...client.CallOption) (*RoutesResponse, error) Routes(ctx context.Context, in *RoutesRequest, opts ...client.CallOption) (*RoutesResponse, error)
// Returns a list of known services based on routes // Returns a list of known services based on routes
Services(ctx context.Context, in *ServicesRequest, opts ...client.CallOption) (*ServicesResponse, error) Services(ctx context.Context, in *ServicesRequest, opts ...client.CallOption) (*ServicesResponse, error)
// Status returns network status
Status(ctx context.Context, in *StatusRequest, opts ...client.CallOption) (*StatusResponse, error)
} }
type networkService struct { type networkService struct {
@ -115,6 +117,16 @@ func (c *networkService) Services(ctx context.Context, in *ServicesRequest, opts
return out, nil return out, nil
} }
func (c *networkService) Status(ctx context.Context, in *StatusRequest, opts ...client.CallOption) (*StatusResponse, error) {
req := c.c.NewRequest(c.name, "Network.Status", in)
out := new(StatusResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Network service // Server API for Network service
type NetworkHandler interface { type NetworkHandler interface {
@ -128,6 +140,8 @@ type NetworkHandler interface {
Routes(context.Context, *RoutesRequest, *RoutesResponse) error Routes(context.Context, *RoutesRequest, *RoutesResponse) error
// Returns a list of known services based on routes // Returns a list of known services based on routes
Services(context.Context, *ServicesRequest, *ServicesResponse) error Services(context.Context, *ServicesRequest, *ServicesResponse) error
// Status returns network status
Status(context.Context, *StatusRequest, *StatusResponse) error
} }
func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error { func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server.HandlerOption) error {
@ -137,6 +151,7 @@ func RegisterNetworkHandler(s server.Server, hdlr NetworkHandler, opts ...server
Nodes(ctx context.Context, in *NodesRequest, out *NodesResponse) error Nodes(ctx context.Context, in *NodesRequest, out *NodesResponse) error
Routes(ctx context.Context, in *RoutesRequest, out *RoutesResponse) error Routes(ctx context.Context, in *RoutesRequest, out *RoutesResponse) error
Services(ctx context.Context, in *ServicesRequest, out *ServicesResponse) error Services(ctx context.Context, in *ServicesRequest, out *ServicesResponse) error
Status(ctx context.Context, in *StatusRequest, out *StatusResponse) error
} }
type Network struct { type Network struct {
network network
@ -168,3 +183,7 @@ func (h *networkHandler) Routes(ctx context.Context, in *RoutesRequest, out *Rou
func (h *networkHandler) Services(ctx context.Context, in *ServicesRequest, out *ServicesResponse) error { func (h *networkHandler) Services(ctx context.Context, in *ServicesRequest, out *ServicesResponse) error {
return h.NetworkHandler.Services(ctx, in, out) return h.NetworkHandler.Services(ctx, in, out)
} }
func (h *networkHandler) Status(ctx context.Context, in *StatusRequest, out *StatusResponse) error {
return h.NetworkHandler.Status(ctx, in, out)
}

View File

@ -16,6 +16,8 @@ service Network {
rpc Routes(RoutesRequest) returns (RoutesResponse) {}; rpc Routes(RoutesRequest) returns (RoutesResponse) {};
// Returns a list of known services based on routes // Returns a list of known services based on routes
rpc Services(ServicesRequest) returns (ServicesResponse) {}; rpc Services(ServicesRequest) returns (ServicesResponse) {};
// Status returns network status
rpc Status(StatusRequest) returns (StatusResponse) {};
} }
// Query is passed in a LookupRequest // Query is passed in a LookupRequest
@ -69,6 +71,23 @@ message ServicesResponse {
repeated string services = 1; repeated string services = 1;
} }
message StatusRequest {}
message StatusResponse {
Status status = 1;
}
// Error tracks network errors
message Error {
uint32 count = 1;
string msg = 2;
}
// Status is node status
message Status {
Error error = 1;
}
// Node is network node // Node is network node
message Node { message Node {
// node id // node id
@ -79,6 +98,8 @@ message Node {
string network = 3; string network = 3;
// associated metadata // associated metadata
map<string,string> metadata = 4; map<string,string> metadata = 4;
// node status
Status status = 5;
} }
// Connect is sent when the node connects to the network // Connect is sent when the node connects to the network