From 35a1de91a982a9be8cda52ba5126684a77fb9c3a Mon Sep 17 00:00:00 2001
From: Milos Gajdos <milosgajdos83@gmail.com>
Date: Thu, 11 Jul 2019 12:36:39 +0100
Subject: [PATCH] Advertise full table every minute.

---
 network/router/default.go       | 139 ++++++++++++++++++++++----------
 network/router/router.go        |   3 +-
 network/router/table/watcher.go |  16 ++++
 3 files changed, 115 insertions(+), 43 deletions(-)

diff --git a/network/router/default.go b/network/router/default.go
index 6250608a..5b13b2ae 100644
--- a/network/router/default.go
+++ b/network/router/default.go
@@ -13,14 +13,16 @@ import (
 )
 
 const (
-	// AdvertiseTick is time interval in which we advertise route updates
-	AdvertiseTick = 5 * time.Second
+	// AdvertiseEventsTick is time interval in which the router advertises route updates
+	AdvertiseEventsTick = 5 * time.Second
+	// AdvertiseTableTick is time interval in which router advertises all routes found in routing table
+	AdvertiseTableTick = 1 * time.Minute
 	// AdvertSuppress is advert suppression threshold
 	AdvertSuppress = 2000
 	// AdvertRecover is advert recovery threshold
 	AdvertRecover = 750
 	// DefaultAdvertTTL is default advertisement TTL
-	DefaultAdvertTTL = time.Minute
+	DefaultAdvertTTL = 1 * time.Minute
 	// PenaltyDecay is the penalty decay
 	PenaltyDecay = 1.15
 	// Delete penalises route addition and deletion
@@ -78,6 +80,28 @@ func (r *router) Options() Options {
 	return r.opts
 }
 
+// manageRoute applies route action on the routing table
+func (r *router) manageRoute(route table.Route, action string) error {
+	switch action {
+	case "create":
+		if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute {
+			return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
+		}
+	case "update":
+		if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute {
+			return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
+		}
+	case "delete":
+		if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound {
+			return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
+		}
+	default:
+		return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action)
+	}
+
+	return nil
+}
+
 // manageServiceRoutes manages routes for a given service.
 // It returns error of the routing table action fails.
 func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
@@ -95,21 +119,8 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
 			Metric:  table.DefaultLocalMetric,
 		}
 
-		switch action {
-		case "create":
-			if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute {
-				return fmt.Errorf("failed adding route for service %s: %s", service.Name, err)
-			}
-		case "update":
-			if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute {
-				return fmt.Errorf("failed updating route for service %s: %s", service.Name, err)
-			}
-		case "delete":
-			if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound {
-				return fmt.Errorf("failed deleting route for service %s: %s", service.Name, err)
-			}
-		default:
-			return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", service.Name, action)
+		if err := r.manageRoute(route, action); err != nil {
+			return err
 		}
 	}
 
@@ -132,8 +143,8 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
 			continue
 		}
 		// manage the routes for all returned services
-		for _, s := range srvs {
-			if err := r.manageServiceRoutes(s, action); err != nil {
+		for _, srv := range srvs {
+			if err := r.manageServiceRoutes(srv, action); err != nil {
 				return err
 			}
 		}
@@ -210,12 +221,14 @@ func (r *router) watchTable(w table.Watcher) error {
 	return watchErr
 }
 
-func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
+// advertiseEvents advertises events to event subscribers
+func (r *router) advertiseEvents(advType AdvertType, events []*table.Event) {
 	defer r.advertWg.Done()
 
 	a := &Advert{
 		Id:        r.opts.Id,
 		Type:      advType,
+		TTL:       DefaultAdvertTTL,
 		Timestamp: time.Now(),
 		Events:    events,
 	}
@@ -225,7 +238,45 @@ func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
 	case <-r.exit:
 		return
 	}
+}
 
+// advertiseTable advertises the whole routing table to the network
+func (r *router) advertiseTable() error {
+	// create table advertisement ticker
+	ticker := time.NewTicker(AdvertiseTableTick)
+
+	for {
+		select {
+		case <-ticker.C:
+			// list routing table routes to announce
+			routes, err := r.List()
+			if err != nil {
+				return fmt.Errorf("failed listing routes: %s", err)
+			}
+			// collect all the added routes before we attempt to add default gateway
+			events := make([]*table.Event, len(routes))
+			for i, route := range routes {
+				event := &table.Event{
+					Type:      table.Update,
+					Timestamp: time.Now(),
+					Route:     route,
+				}
+				events[i] = event
+			}
+
+			// advertise all routes as Update events to subscribers
+			if len(events) > 0 {
+				go func() {
+					r.advertWg.Add(1)
+					r.advertiseEvents(Update, events)
+				}()
+			}
+		case <-r.exit:
+			return nil
+		}
+	}
+
+	return nil
 }
 
 // isFlapping detects if the event is flapping based on the current and previous event status.
@@ -241,8 +292,8 @@ func isFlapping(curr, prev *table.Event) bool {
 	return false
 }
 
-// updateEvent is a table event enriched with advertisement data
-type updateEvent struct {
+// advertEvent is a table event enriched with advertisement data
+type advertEvent struct {
 	*table.Event
 	// timestamp marks the time the event has been received
 	timestamp time.Time
@@ -258,15 +309,16 @@ type updateEvent struct {
 // It suppresses unhealthy flapping events and advertises healthy events upstream.
 func (r *router) processEvents() error {
 	// ticker to periodically scan event for advertising
-	ticker := time.NewTicker(AdvertiseTick)
+	ticker := time.NewTicker(AdvertiseEventsTick)
 	// eventMap is a map of advert events
-	eventMap := make(map[uint64]*updateEvent)
+	eventMap := make(map[uint64]*advertEvent)
 
 	for {
 		select {
 		case <-ticker.C:
 			var events []*table.Event
 			// collect all events which are not flapping
+			// TODO: decay the events and update suppression
 			for key, event := range eventMap {
 				if !event.isFlapping && !event.isSuppressed {
 					e := new(table.Event)
@@ -277,9 +329,10 @@ func (r *router) processEvents() error {
 				}
 			}
 
+			// advertise all Update events to subscribers
 			if len(events) > 0 {
 				r.advertWg.Add(1)
-				go r.advertEvents(Update, events)
+				go r.advertiseEvents(Update, events)
 			}
 		case e := <-r.eventChan:
 			// event timestamp
@@ -301,7 +354,7 @@ func (r *router) processEvents() error {
 			hash := e.Route.Hash()
 			event, ok := eventMap[hash]
 			if !ok {
-				event = &updateEvent{
+				event = &advertEvent{
 					Event:     e,
 					penalty:   penalty,
 					timestamp: time.Now(),
@@ -334,7 +387,7 @@ func (r *router) processEvents() error {
 		}
 	}
 
-	// we probably never reach this place
+	// we probably never reach this code path
 
 	return nil
 }
@@ -438,7 +491,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
 		}
 
 		// error channel collecting goroutine errors
-		errChan := make(chan error, 3)
+		errChan := make(chan error, 4)
 
 		r.wg.Add(1)
 		go func() {
@@ -457,24 +510,27 @@ func (r *router) Advertise() (<-chan *Advert, error) {
 		r.wg.Add(1)
 		go func() {
 			defer r.wg.Done()
-			// listen to routing table events and process them
+			// watch routing table events and process them
 			errChan <- r.processEvents()
 		}()
 
+		r.advertWg.Add(1)
+		go func() {
+			defer r.advertWg.Done()
+			// advertise the whole routing table
+			errChan <- r.advertiseTable()
+		}()
+
+		// advertise your presence
+		r.advertWg.Add(1)
+		go r.advertiseEvents(Announce, events)
+
 		// watch for errors and cleanup
 		r.wg.Add(1)
 		go r.watchErrors(errChan)
 
-		// advertise your presence
-		r.advertWg.Add(1)
-		go r.advertEvents(Announce, events)
-
 		// mark router as running and set its Error to nil
-		status := Status{
-			Code:  Running,
-			Error: nil,
-		}
-		r.status = status
+		r.status = Status{Code: Running, Error: nil}
 	}
 
 	return r.advertChan, nil
@@ -494,8 +550,9 @@ func (r *router) Process(a *Advert) error {
 	for _, event := range events {
 		// create a copy of the route
 		route := event.Route
-		if err := r.Update(route); err != nil {
-			return fmt.Errorf("failed updating routing table: %v", err)
+		action := event.Type
+		if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil {
+			return fmt.Errorf("failed applying action %s to routing table: %s", action, err)
 		}
 	}
 
diff --git a/network/router/router.go b/network/router/router.go
index 39e8d94a..724f38be 100644
--- a/network/router/router.go
+++ b/network/router/router.go
@@ -58,8 +58,7 @@ type Advert struct {
 	// Timestamp marks the time when the update is sent
 	Timestamp time.Time
 	// TTL is Advert TTL
-	// TODO: not used
-	TTL time.Time
+	TTL time.Duration
 	// Events is a list of routing table events to advertise
 	Events []*table.Event
 }
diff --git a/network/router/table/watcher.go b/network/router/table/watcher.go
index 0f8fab7f..503993ff 100644
--- a/network/router/table/watcher.go
+++ b/network/router/table/watcher.go
@@ -22,6 +22,22 @@ const (
 	Update
 )
 
+// String implements fmt.Stringer
+// NOTE: we need this as this makes converting the numeric codes
+// into miro style string actions very simple
+func (et EventType) String() string {
+	switch et {
+	case Create:
+		return "create"
+	case Delete:
+		return "delete"
+	case Update:
+		return "update"
+	default:
+		return "unknown"
+	}
+}
+
 // Event is returned by a call to Next on the watcher.
 type Event struct {
 	// Type defines type of event