diff --git a/network/router/default_router.go b/network/router/default_router.go index ae55ee9f..18544b6e 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" @@ -12,8 +13,8 @@ import ( // router provides default router implementation type router struct { opts Options - started bool - advertChan chan *Advertisement + running bool + advertChan chan *Update exit chan struct{} wg *sync.WaitGroup } @@ -30,8 +31,8 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, - started: false, - advertChan: make(chan *Advertisement), + running: false, + advertChan: make(chan *Update), exit: make(chan struct{}), wg: &sync.WaitGroup{}, } @@ -72,8 +73,8 @@ func (r *router) Network() string { // Advertise advertises the routes to the network. // It returns error if any of the launched goroutines fail with error. -func (r *router) Advertise() (<-chan *Advertisement, error) { - if !r.started { +func (r *router) Advertise() (<-chan *Update, error) { + if !r.running { // add local service routes into the routing table if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) @@ -135,17 +136,19 @@ func (r *router) Advertise() (<-chan *Advertisement, error) { // close the advertise channel close(r.advertChan) + // mark the router as stopped + r.running = false }() - // mark the router as started - r.started = true + // mark the router as running + r.running = true } return r.advertChan, nil } // Update updates the routing table using the advertised values -func (r *router) Update(a *Advertisement) error { +func (r *router) Update(a *Update) error { // we extract the route from advertisement and update the routing table route := Route{ Destination: a.Event.Route.Destination, @@ -279,9 +282,16 @@ func (r *router) watchTable(w Watcher) error { break } - r.advertChan <- &Advertisement{ - ID: r.ID(), - Event: event, + u := &Update{ + ID: r.ID(), + Timestamp: time.Now(), + Event: event, + } + + select { + case <-r.exit: + return nil + case r.advertChan <- u: } } diff --git a/network/router/router.go b/network/router/router.go index a8772768..9aa8789a 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -1,6 +1,8 @@ // Package router provides a network routing control plane package router +import "time" + var ( // DefaultRouter is default network router DefaultRouter = NewRouter() @@ -21,19 +23,21 @@ type Router interface { // Network returns the network address of the router Network() string // Advertise starts advertising routes to the network - Advertise() (<-chan *Advertisement, error) + Advertise() (<-chan *Update, error) // Update updates the routing table - Update(*Advertisement) error + Update(*Update) error // Stop stops the router Stop() error // String returns debug info String() string } -// Advertisement is sent by the router to the network -type Advertisement struct { +// Update is sent by the router to the network +type Update struct { // ID is the source router ID ID string + // Timestamp marks the time when update is sent + Timestamp time.Time // Event defines advertisement even Event *Event }