diff --git a/network/default.go b/network/default.go index 56f82a17..bab14f13 100644 --- a/network/default.go +++ b/network/default.go @@ -136,52 +136,80 @@ func (n *network) resolve() { } } -func (n *network) process(client transport.Client) { +func (n *network) handleConn(conn tunnel.Conn, msg chan *transport.Message) { for { m := new(transport.Message) - if err := client.Recv(m); err != nil { + if err := conn.Recv(m); err != nil { // TODO: should we bail here? - log.Debugf("Network advert receive error: %v", err) + log.Debugf("Network tunnel advert receive error: %v", err) return } - // switch on type of message and take action - switch m.Header["Micro-Method"] { - case "advert": - pbAdvert := &pb.Advert{} - if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { - continue - } + select { + case msg <- m: + case <-n.closed: + return + } + } +} - var events []*router.Event - for _, event := range pbAdvert.Events { - route := router.Route{ - Service: event.Route.Service, - Address: event.Route.Address, - Gateway: event.Route.Gateway, - Network: event.Route.Network, - Link: event.Route.Link, - Metric: int(event.Route.Metric), +func (n *network) process(l tunnel.Listener) { + // receive control message queue + recv := make(chan *transport.Message, 128) + + // accept a connection + conn, err := l.Accept() + if err != nil { + // TODO: handle this + log.Debugf("Network tunnel accept error: %v", err) + return + } + + go n.handleConn(conn, recv) + + for { + select { + case m := <-recv: + // switch on type of message and take action + switch m.Header["Micro-Method"] { + case "advert": + pbAdvert := &pb.Advert{} + if err := proto.Unmarshal(m.Body, pbAdvert); err != nil { + continue } - e := &router.Event{ - Type: router.EventType(event.Type), + + var events []*router.Event + for _, event := range pbAdvert.Events { + route := router.Route{ + Service: event.Route.Service, + Address: event.Route.Address, + Gateway: event.Route.Gateway, + Network: event.Route.Network, + Link: event.Route.Link, + Metric: int(event.Route.Metric), + } + e := &router.Event{ + Type: router.EventType(event.Type), + Timestamp: time.Unix(0, pbAdvert.Timestamp), + Route: route, + } + events = append(events, e) + } + advert := &router.Advert{ + Id: pbAdvert.Id, + Type: router.AdvertType(pbAdvert.Type), Timestamp: time.Unix(0, pbAdvert.Timestamp), - Route: route, + TTL: time.Duration(pbAdvert.Ttl), + Events: events, } - events = append(events, e) - } - advert := &router.Advert{ - Id: pbAdvert.Id, - Type: router.AdvertType(pbAdvert.Type), - Timestamp: time.Unix(0, pbAdvert.Timestamp), - TTL: time.Duration(pbAdvert.Ttl), - Events: events, - } - if err := n.Router.Process(advert); err != nil { - log.Debugf("Network failed to process advert %s: %v", advert.Id, err) - continue + if err := n.Router.Process(advert); err != nil { + log.Debugf("Network failed to process advert %s: %v", advert.Id, err) + continue + } } + case <-n.closed: + return } } } @@ -268,7 +296,12 @@ func (n *network) Connect() error { // dial into ControlChannel to send route adverts client, err := n.Tunnel.Dial(ControlChannel) if err != nil { - // TODO: should we stop the tunnel here? + return err + } + + // listen on ControlChannel + listener, err := n.Tunnel.Listen(ControlChannel) + if err != nil { return err } @@ -291,8 +324,8 @@ func (n *network) Connect() error { // advertise routes go n.advertise(client, advertChan) - // process routes - go n.process(client) + // accept and process routes + go n.process(listener) // start the server if err := n.srv.Start(); err != nil {