diff --git a/network/router/default_router.go b/network/router/default.go similarity index 89% rename from network/router/default_router.go rename to network/router/default.go index a9517f9d..44e07108 100644 --- a/network/router/default_router.go +++ b/network/router/default.go @@ -9,6 +9,7 @@ import ( "time" "github.com/micro/go-log" + "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" ) @@ -33,7 +34,7 @@ type router struct { opts Options status Status exit chan struct{} - eventChan chan *Event + eventChan chan *table.Event advertChan chan *Advert wg *sync.WaitGroup sync.RWMutex @@ -53,7 +54,7 @@ func newRouter(opts ...Option) Router { opts: options, status: Status{Error: nil, Code: Stopped}, exit: make(chan struct{}), - eventChan: make(chan *Event), + eventChan: make(chan *table.Event), advertChan: make(chan *Advert), wg: &sync.WaitGroup{}, } @@ -78,7 +79,7 @@ func (r *router) ID() string { } // Table returns routing table -func (r *router) Table() Table { +func (r *router) Table() table.Table { return r.opts.Table } @@ -99,7 +100,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m action = strings.ToLower(action) // take route action on each service node for _, node := range service.Nodes { - route := Route{ + route := table.Route{ Destination: service.Name, Gateway: node.Address, Router: r.opts.Address, @@ -108,11 +109,11 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m } switch action { case "insert", "create": - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { + if err := r.opts.Table.Add(route); err != nil && err != table.ErrDuplicateRoute { return fmt.Errorf("failed adding route for service %s: %s", service.Name, err) } case "delete": - if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { + if err := r.opts.Table.Delete(route); err != nil && err != table.ErrRouteNotFound { return fmt.Errorf("failed deleting route for service %v: %s", service.Name, err) } default: @@ -138,7 +139,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metr log.Logf("r.manageRegistryRoutes() GetService() error: %v", err) continue } - // manage the routes for all return services + // manage the routes for all returned services for _, s := range srvs { if err := r.manageServiceRoutes(s, action, metric); err != nil { return err @@ -174,7 +175,7 @@ func (r *router) watchServices(w registry.Watcher) error { log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) - if err := r.manageServiceRoutes(res.Service, res.Action, DefaultLocalMetric); err != nil { + if err := r.manageServiceRoutes(res.Service, res.Action, table.DefaultLocalMetric); err != nil { return err } } @@ -184,7 +185,7 @@ func (r *router) watchServices(w registry.Watcher) error { // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry // It returns error if the locally registered services either fails to be added/deleted to/from network registry. -func (r *router) watchTable(w Watcher) error { +func (r *router) watchTable(w table.Watcher) error { // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -199,11 +200,14 @@ func (r *router) watchTable(w Watcher) error { for { event, err := w.Next() if err != nil { - if err != ErrWatcherStopped { + if err != table.ErrWatcherStopped { watchErr = err } break } + + log.Logf("r.watchTable() new table event: %s", event) + select { case <-r.exit: close(r.eventChan) @@ -218,14 +222,14 @@ func (r *router) watchTable(w Watcher) error { return watchErr } -func eventFlap(curr, prev *Event) bool { - if curr.Type == UpdateEvent && prev.Type == UpdateEvent { +func eventFlap(curr, prev *table.Event) bool { + if curr.Type == table.Update && prev.Type == table.Update { // update flap: this can be either metric or whatnot log.Logf("eventFlap(): Update flap") return true } - if curr.Type == CreateEvent && prev.Type == DeleteEvent || curr.Type == DeleteEvent && prev.Type == CreateEvent { + if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create { log.Logf("eventFlap(): Create/Delete flap") return true } @@ -241,7 +245,7 @@ func (r *router) processEvents() error { // advertEvent is a table event enriched with advert data type advertEvent struct { - *Event + *table.Event timestamp time.Time penalty float64 isSuppressed bool @@ -259,7 +263,7 @@ process: for { select { case <-ticker.C: - var events []*Event + var events []*table.Event // decay the penalties of existing events mu.Lock() for advert, event := range eventMap { @@ -273,7 +277,7 @@ process: event.isFlapping = false } if !event.isFlapping { - e := new(Event) + e := new(table.Event) *e = *event.Event events = append(events, e) // this deletes the advertised event from the map @@ -284,7 +288,7 @@ process: if len(events) > 0 { wg.Add(1) - go func(events []*Event) { + go func(events []*table.Event) { defer wg.Done() log.Logf("go advertise(): start") @@ -319,9 +323,9 @@ process: // determine the event penalty var penalty float64 switch e.Type { - case UpdateEvent: + case table.Update: penalty = UpdateRoutePenalty - case CreateEvent, DeleteEvent: + case table.Create, table.Delete: penalty = DeleteRoutePenalty } // we use route hash as eventMap key @@ -366,8 +370,8 @@ process: return nil } -// manage watches router errors and takes appropriate actions -func (r *router) manage(errChan <-chan error) { +// watchErrors watches router errors and takes appropriate actions +func (r *router) watchErrors(errChan <-chan error) { defer r.wg.Done() log.Logf("r.manage(): manage start") @@ -382,7 +386,7 @@ func (r *router) manage(errChan <-chan error) { code = Error } - log.Logf("r.manage(): manage exiting") + log.Logf("r.watchErrors(): watchErrors exiting") r.Lock() defer r.Unlock() @@ -404,7 +408,7 @@ func (r *router) manage(errChan <-chan error) { } } - log.Logf("r.manage(): manage exit") + log.Logf("r.watchErrors(): watchErrors exit") } // Advertise advertises the routes to the network. @@ -415,19 +419,19 @@ func (r *router) Advertise() (<-chan *Advert, error) { if r.status.Code != Running { // add all local service routes into the routing table - if err := r.manageRegistryRoutes(r.opts.Registry, "insert", DefaultLocalMetric); err != nil { + if err := r.manageRegistryRoutes(r.opts.Registry, "insert", table.DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) } log.Logf("Routing table:\n%s", r.opts.Table) // add default gateway into routing table if r.opts.Gateway != "" { // note, the only non-default value is the gateway - route := Route{ + route := table.Route{ Destination: "*", Gateway: r.opts.Gateway, Router: "*", Network: "*", - Metric: DefaultLocalMetric, + Metric: table.DefaultLocalMetric, } if err := r.opts.Table.Add(route); err != nil { return nil, fmt.Errorf("failed adding default gateway route: %s", err) @@ -438,12 +442,12 @@ func (r *router) Advertise() (<-chan *Advert, error) { // TODO: these channels most likely won't have to be the struct fields if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) - r.eventChan = make(chan *Event) + r.eventChan = make(chan *table.Event) r.advertChan = make(chan *Advert) } // routing table watcher which watches all routes i.e. to every destination - tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) + tableWatcher, err := r.opts.Table.Watch(table.WatchDestination("*")) if err != nil { return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } @@ -484,7 +488,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { }() r.wg.Add(1) - go r.manage(errChan) + go r.watchErrors(errChan) // mark router as running and set its Error to nil status := Status{ @@ -501,7 +505,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { func (r *router) Update(a *Advert) error { // NOTE: event sorting might not be necessary // copy update events intp new slices - events := make([]*Event, len(a.Events)) + events := make([]*table.Event, len(a.Events)) copy(events, a.Events) // sort events by timestamp sort.Slice(events, func(i, j int) bool { @@ -510,13 +514,13 @@ func (r *router) Update(a *Advert) error { for _, event := range events { // we extract the route from advertisement and update the routing table - route := Route{ + route := table.Route{ Destination: event.Route.Destination, Gateway: event.Route.Gateway, Router: event.Route.Router, Network: event.Route.Network, Metric: event.Route.Metric, - Policy: Insert, + Policy: table.Insert, } if err := r.opts.Table.Update(route); err != nil { return fmt.Errorf("failed updating routing table: %v", err) diff --git a/network/router/options.go b/network/router/options.go index f6413a53..eb287075 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -2,6 +2,7 @@ package router import ( "github.com/google/uuid" + "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" ) @@ -25,7 +26,7 @@ type Options struct { // Registry is the local registry Registry registry.Registry // Table is routing table - Table Table + Table table.Table } // ID sets Router ID @@ -57,7 +58,7 @@ func Gateway(g string) Option { } // RoutingTable sets the routing table -func RoutingTable(t Table) Option { +func RoutingTable(t table.Table) Option { return func(o *Options) { o.Table = t } @@ -77,6 +78,6 @@ func DefaultOptions() Options { Address: DefaultAddress, Network: DefaultNetwork, Registry: registry.DefaultRegistry, - Table: NewTable(), + Table: table.NewTable(), } } diff --git a/network/router/router.go b/network/router/router.go index ea91538c..e0f7d676 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -1,7 +1,11 @@ // Package router provides a network routing control plane package router -import "time" +import ( + "time" + + "github.com/micro/go-micro/network/router/table" +) var ( // DefaultRouter is default network router @@ -17,7 +21,7 @@ type Router interface { // ID returns the ID of the router ID() string // Table returns the routing table - Table() Table + Table() table.Table // Address returns the router adddress Address() string // Network returns the network address of the router @@ -65,8 +69,8 @@ type Advert struct { ID string // Timestamp marks the time when the update is sent Timestamp time.Time - // Events is a list of events to advertise - Events []*Event + // Events is a list of routing table events to advertise + Events []*table.Event } // StatusCode defines router status diff --git a/network/router/default_table.go b/network/router/table/default.go similarity index 93% rename from network/router/default_table.go rename to network/router/table/default.go index 9ae35e41..3480183e 100644 --- a/network/router/default_table.go +++ b/network/router/table/default.go @@ -1,4 +1,4 @@ -package router +package table import ( "fmt" @@ -67,21 +67,21 @@ func (t *table) Add(r Route) error { if _, ok := t.m[destAddr]; !ok { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } // add new route to the table for the route destination if _, ok := t.m[destAddr][sum]; !ok { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } // only add the route if the route override is explicitly requested if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) + go t.sendEvent(&Event{Type: Update, Route: r}) return nil } @@ -107,7 +107,7 @@ func (t *table) Delete(r Route) error { } delete(t.m[destAddr], sum) - go t.sendEvent(&Event{Type: DeleteEvent, Route: r}) + go t.sendEvent(&Event{Type: Delete, Route: r}) return nil } @@ -125,7 +125,7 @@ func (t *table) Update(r Route) error { if r.Policy == Insert { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } return ErrRouteNotFound @@ -135,14 +135,14 @@ func (t *table) Update(r Route) error { // NOTE: We only insert the route if explicitly requested by the client if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } // if the route has been found update it if _, ok := t.m[destAddr][sum]; ok { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) + go t.sendEvent(&Event{Type: Update, Route: r}) return nil } diff --git a/network/router/default_table_test.go b/network/router/table/default_test.go similarity index 99% rename from network/router/default_table_test.go rename to network/router/table/default_test.go index 46b8b034..a0c364b7 100644 --- a/network/router/default_table_test.go +++ b/network/router/table/default_test.go @@ -1,4 +1,4 @@ -package router +package table import "testing" diff --git a/network/router/query.go b/network/router/table/query.go similarity index 99% rename from network/router/query.go rename to network/router/table/query.go index befdd1d9..ccb7d396 100644 --- a/network/router/query.go +++ b/network/router/table/query.go @@ -1,4 +1,4 @@ -package router +package table import ( "fmt" diff --git a/network/router/route.go b/network/router/table/route.go similarity index 99% rename from network/router/route.go rename to network/router/table/route.go index f995d9d4..3d9c3bcb 100644 --- a/network/router/route.go +++ b/network/router/table/route.go @@ -1,4 +1,4 @@ -package router +package table import ( "fmt" diff --git a/network/router/table.go b/network/router/table/table.go similarity index 98% rename from network/router/table.go rename to network/router/table/table.go index c00484cd..9d353a54 100644 --- a/network/router/table.go +++ b/network/router/table/table.go @@ -1,4 +1,4 @@ -package router +package table import ( "errors" diff --git a/network/router/table_watcher.go b/network/router/table/watcher.go similarity index 87% rename from network/router/table_watcher.go rename to network/router/table/watcher.go index 2c5d8989..850a9089 100644 --- a/network/router/table_watcher.go +++ b/network/router/table/watcher.go @@ -1,4 +1,4 @@ -package router +package table import ( "errors" @@ -19,22 +19,22 @@ var ( type EventType int const ( - // CreateEvent is emitted when a new route has been created - CreateEvent EventType = iota - // DeleteEvent is emitted when an existing route has been deleted - DeleteEvent - // UpdateEvent is emitted when an existing route has been updated - UpdateEvent + // Create is emitted when a new route has been created + Create EventType = iota + // Delete is emitted when an existing route has been deleted + Delete + // Update is emitted when an existing route has been updated + Update ) // String returns string representation of the event func (et EventType) String() string { switch et { - case CreateEvent: + case Create: return "CREATE" - case DeleteEvent: + case Delete: return "DELETE" - case UpdateEvent: + case Update: return "UPDATE" default: return "UNKNOWN" @@ -53,7 +53,7 @@ type Event struct { // String prints human readable Event func (e Event) String() string { - return fmt.Sprintf("[EVENT] Type: %s\nRoute:\n%s", e.Type, e.Route) + return fmt.Sprintf("[EVENT] %s:\nRoute:\n%s", e.Type, e.Route) } // WatchOption is used to define what routes to watch in the table