Added ControlChannel tunnel.Listener to process incoming messages

This commit is contained in:
Milos Gajdos 2019-08-22 13:17:32 +01:00
parent db89fc4efe
commit e53484302c
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F

View File

@ -136,52 +136,80 @@ func (n *network) resolve() {
} }
} }
func (n *network) process(client transport.Client) { func (n *network) handleConn(conn tunnel.Conn, msg chan *transport.Message) {
for { for {
m := new(transport.Message) m := new(transport.Message)
if err := client.Recv(m); err != nil { if err := conn.Recv(m); err != nil {
// TODO: should we bail here? // TODO: should we bail here?
log.Debugf("Network advert receive error: %v", err) log.Debugf("Network tunnel advert receive error: %v", err)
return return
} }
// switch on type of message and take action select {
switch m.Header["Micro-Method"] { case msg <- m:
case "advert": case <-n.closed:
pbAdvert := &pb.Advert{} return
if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { }
continue }
} }
var events []*router.Event func (n *network) process(l tunnel.Listener) {
for _, event := range pbAdvert.Events { // receive control message queue
route := router.Route{ recv := make(chan *transport.Message, 128)
Service: event.Route.Service,
Address: event.Route.Address, // accept a connection
Gateway: event.Route.Gateway, conn, err := l.Accept()
Network: event.Route.Network, if err != nil {
Link: event.Route.Link, // TODO: handle this
Metric: int(event.Route.Metric), log.Debugf("Network tunnel accept error: %v", err)
return
}
go n.handleConn(conn, recv)
for {
select {
case m := <-recv:
// switch on type of message and take action
switch m.Header["Micro-Method"] {
case "advert":
pbAdvert := &pb.Advert{}
if err := proto.Unmarshal(m.Body, pbAdvert); err != nil {
continue
} }
e := &router.Event{
Type: router.EventType(event.Type), var events []*router.Event
for _, event := range pbAdvert.Events {
route := router.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Link: event.Route.Link,
Metric: int(event.Route.Metric),
}
e := &router.Event{
Type: router.EventType(event.Type),
Timestamp: time.Unix(0, pbAdvert.Timestamp),
Route: route,
}
events = append(events, e)
}
advert := &router.Advert{
Id: pbAdvert.Id,
Type: router.AdvertType(pbAdvert.Type),
Timestamp: time.Unix(0, pbAdvert.Timestamp), Timestamp: time.Unix(0, pbAdvert.Timestamp),
Route: route, TTL: time.Duration(pbAdvert.Ttl),
Events: events,
} }
events = append(events, e)
}
advert := &router.Advert{
Id: pbAdvert.Id,
Type: router.AdvertType(pbAdvert.Type),
Timestamp: time.Unix(0, pbAdvert.Timestamp),
TTL: time.Duration(pbAdvert.Ttl),
Events: events,
}
if err := n.Router.Process(advert); err != nil { if err := n.Router.Process(advert); err != nil {
log.Debugf("Network failed to process advert %s: %v", advert.Id, err) log.Debugf("Network failed to process advert %s: %v", advert.Id, err)
continue continue
}
} }
case <-n.closed:
return
} }
} }
} }
@ -268,7 +296,12 @@ func (n *network) Connect() error {
// dial into ControlChannel to send route adverts // dial into ControlChannel to send route adverts
client, err := n.Tunnel.Dial(ControlChannel) client, err := n.Tunnel.Dial(ControlChannel)
if err != nil { if err != nil {
// TODO: should we stop the tunnel here? return err
}
// listen on ControlChannel
listener, err := n.Tunnel.Listen(ControlChannel)
if err != nil {
return err return err
} }
@ -291,8 +324,8 @@ func (n *network) Connect() error {
// advertise routes // advertise routes
go n.advertise(client, advertChan) go n.advertise(client, advertChan)
// process routes // accept and process routes
go n.process(client) go n.process(listener)
// start the server // start the server
if err := n.srv.Start(); err != nil { if err := n.srv.Start(); err != nil {