Table now has a dedicated package inside router package.

This commit is contained in:
Milos Gajdos 2019-07-08 16:51:55 +01:00
parent 0c1a28a9b6
commit cc590f5f2c
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
9 changed files with 71 additions and 62 deletions

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/micro/go-log" "github.com/micro/go-log"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/olekukonko/tablewriter" "github.com/olekukonko/tablewriter"
) )
@ -33,7 +34,7 @@ type router struct {
opts Options opts Options
status Status status Status
exit chan struct{} exit chan struct{}
eventChan chan *Event eventChan chan *table.Event
advertChan chan *Advert advertChan chan *Advert
wg *sync.WaitGroup wg *sync.WaitGroup
sync.RWMutex sync.RWMutex
@ -53,7 +54,7 @@ func newRouter(opts ...Option) Router {
opts: options, opts: options,
status: Status{Error: nil, Code: Stopped}, status: Status{Error: nil, Code: Stopped},
exit: make(chan struct{}), exit: make(chan struct{}),
eventChan: make(chan *Event), eventChan: make(chan *table.Event),
advertChan: make(chan *Advert), advertChan: make(chan *Advert),
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
} }
@ -78,7 +79,7 @@ func (r *router) ID() string {
} }
// Table returns routing table // Table returns routing table
func (r *router) Table() Table { func (r *router) Table() table.Table {
return r.opts.Table return r.opts.Table
} }
@ -99,7 +100,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m
action = strings.ToLower(action) action = strings.ToLower(action)
// take route action on each service node // take route action on each service node
for _, node := range service.Nodes { for _, node := range service.Nodes {
route := Route{ route := table.Route{
Destination: service.Name, Destination: service.Name,
Gateway: node.Address, Gateway: node.Address,
Router: r.opts.Address, Router: r.opts.Address,
@ -108,11 +109,11 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m
} }
switch action { switch action {
case "insert", "create": case "insert", "create":
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { if err := r.opts.Table.Add(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", service.Name, err) return fmt.Errorf("failed adding route for service %s: %s", service.Name, err)
} }
case "delete": case "delete":
if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { if err := r.opts.Table.Delete(route); err != nil && err != table.ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %v: %s", service.Name, err) return fmt.Errorf("failed deleting route for service %v: %s", service.Name, err)
} }
default: default:
@ -138,7 +139,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metr
log.Logf("r.manageRegistryRoutes() GetService() error: %v", err) log.Logf("r.manageRegistryRoutes() GetService() error: %v", err)
continue continue
} }
// manage the routes for all return services // manage the routes for all returned services
for _, s := range srvs { for _, s := range srvs {
if err := r.manageServiceRoutes(s, action, metric); err != nil { if err := r.manageServiceRoutes(s, action, metric); err != nil {
return err return err
@ -174,7 +175,7 @@ func (r *router) watchServices(w registry.Watcher) error {
log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service)
if err := r.manageServiceRoutes(res.Service, res.Action, DefaultLocalMetric); err != nil { if err := r.manageServiceRoutes(res.Service, res.Action, table.DefaultLocalMetric); err != nil {
return err return err
} }
} }
@ -184,7 +185,7 @@ func (r *router) watchServices(w registry.Watcher) error {
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry. // It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w Watcher) error { func (r *router) watchTable(w table.Watcher) error {
// wait in the background for the router to stop // wait in the background for the router to stop
// when the router stops, stop the watcher and exit // when the router stops, stop the watcher and exit
r.wg.Add(1) r.wg.Add(1)
@ -199,11 +200,14 @@ func (r *router) watchTable(w Watcher) error {
for { for {
event, err := w.Next() event, err := w.Next()
if err != nil { if err != nil {
if err != ErrWatcherStopped { if err != table.ErrWatcherStopped {
watchErr = err watchErr = err
} }
break break
} }
log.Logf("r.watchTable() new table event: %s", event)
select { select {
case <-r.exit: case <-r.exit:
close(r.eventChan) close(r.eventChan)
@ -218,14 +222,14 @@ func (r *router) watchTable(w Watcher) error {
return watchErr return watchErr
} }
func eventFlap(curr, prev *Event) bool { func eventFlap(curr, prev *table.Event) bool {
if curr.Type == UpdateEvent && prev.Type == UpdateEvent { if curr.Type == table.Update && prev.Type == table.Update {
// update flap: this can be either metric or whatnot // update flap: this can be either metric or whatnot
log.Logf("eventFlap(): Update flap") log.Logf("eventFlap(): Update flap")
return true return true
} }
if curr.Type == CreateEvent && prev.Type == DeleteEvent || curr.Type == DeleteEvent && prev.Type == CreateEvent { if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create {
log.Logf("eventFlap(): Create/Delete flap") log.Logf("eventFlap(): Create/Delete flap")
return true return true
} }
@ -241,7 +245,7 @@ func (r *router) processEvents() error {
// advertEvent is a table event enriched with advert data // advertEvent is a table event enriched with advert data
type advertEvent struct { type advertEvent struct {
*Event *table.Event
timestamp time.Time timestamp time.Time
penalty float64 penalty float64
isSuppressed bool isSuppressed bool
@ -259,7 +263,7 @@ process:
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
var events []*Event var events []*table.Event
// decay the penalties of existing events // decay the penalties of existing events
mu.Lock() mu.Lock()
for advert, event := range eventMap { for advert, event := range eventMap {
@ -273,7 +277,7 @@ process:
event.isFlapping = false event.isFlapping = false
} }
if !event.isFlapping { if !event.isFlapping {
e := new(Event) e := new(table.Event)
*e = *event.Event *e = *event.Event
events = append(events, e) events = append(events, e)
// this deletes the advertised event from the map // this deletes the advertised event from the map
@ -284,7 +288,7 @@ process:
if len(events) > 0 { if len(events) > 0 {
wg.Add(1) wg.Add(1)
go func(events []*Event) { go func(events []*table.Event) {
defer wg.Done() defer wg.Done()
log.Logf("go advertise(): start") log.Logf("go advertise(): start")
@ -319,9 +323,9 @@ process:
// determine the event penalty // determine the event penalty
var penalty float64 var penalty float64
switch e.Type { switch e.Type {
case UpdateEvent: case table.Update:
penalty = UpdateRoutePenalty penalty = UpdateRoutePenalty
case CreateEvent, DeleteEvent: case table.Create, table.Delete:
penalty = DeleteRoutePenalty penalty = DeleteRoutePenalty
} }
// we use route hash as eventMap key // we use route hash as eventMap key
@ -366,8 +370,8 @@ process:
return nil return nil
} }
// manage watches router errors and takes appropriate actions // watchErrors watches router errors and takes appropriate actions
func (r *router) manage(errChan <-chan error) { func (r *router) watchErrors(errChan <-chan error) {
defer r.wg.Done() defer r.wg.Done()
log.Logf("r.manage(): manage start") log.Logf("r.manage(): manage start")
@ -382,7 +386,7 @@ func (r *router) manage(errChan <-chan error) {
code = Error code = Error
} }
log.Logf("r.manage(): manage exiting") log.Logf("r.watchErrors(): watchErrors exiting")
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -404,7 +408,7 @@ func (r *router) manage(errChan <-chan error) {
} }
} }
log.Logf("r.manage(): manage exit") log.Logf("r.watchErrors(): watchErrors exit")
} }
// Advertise advertises the routes to the network. // Advertise advertises the routes to the network.
@ -415,19 +419,19 @@ func (r *router) Advertise() (<-chan *Advert, error) {
if r.status.Code != Running { if r.status.Code != Running {
// add all local service routes into the routing table // add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.opts.Registry, "insert", DefaultLocalMetric); err != nil { if err := r.manageRegistryRoutes(r.opts.Registry, "insert", table.DefaultLocalMetric); err != nil {
return nil, fmt.Errorf("failed adding routes: %v", err) return nil, fmt.Errorf("failed adding routes: %v", err)
} }
log.Logf("Routing table:\n%s", r.opts.Table) log.Logf("Routing table:\n%s", r.opts.Table)
// add default gateway into routing table // add default gateway into routing table
if r.opts.Gateway != "" { if r.opts.Gateway != "" {
// note, the only non-default value is the gateway // note, the only non-default value is the gateway
route := Route{ route := table.Route{
Destination: "*", Destination: "*",
Gateway: r.opts.Gateway, Gateway: r.opts.Gateway,
Router: "*", Router: "*",
Network: "*", Network: "*",
Metric: DefaultLocalMetric, Metric: table.DefaultLocalMetric,
} }
if err := r.opts.Table.Add(route); err != nil { if err := r.opts.Table.Add(route); err != nil {
return nil, fmt.Errorf("failed adding default gateway route: %s", err) return nil, fmt.Errorf("failed adding default gateway route: %s", err)
@ -438,12 +442,12 @@ func (r *router) Advertise() (<-chan *Advert, error) {
// TODO: these channels most likely won't have to be the struct fields // TODO: these channels most likely won't have to be the struct fields
if r.status.Code == Error || r.status.Code == Stopped { if r.status.Code == Error || r.status.Code == Stopped {
r.exit = make(chan struct{}) r.exit = make(chan struct{})
r.eventChan = make(chan *Event) r.eventChan = make(chan *table.Event)
r.advertChan = make(chan *Advert) r.advertChan = make(chan *Advert)
} }
// routing table watcher which watches all routes i.e. to every destination // routing table watcher which watches all routes i.e. to every destination
tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) tableWatcher, err := r.opts.Table.Watch(table.WatchDestination("*"))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed creating routing table watcher: %v", err) return nil, fmt.Errorf("failed creating routing table watcher: %v", err)
} }
@ -484,7 +488,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
}() }()
r.wg.Add(1) r.wg.Add(1)
go r.manage(errChan) go r.watchErrors(errChan)
// mark router as running and set its Error to nil // mark router as running and set its Error to nil
status := Status{ status := Status{
@ -501,7 +505,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
func (r *router) Update(a *Advert) error { func (r *router) Update(a *Advert) error {
// NOTE: event sorting might not be necessary // NOTE: event sorting might not be necessary
// copy update events intp new slices // copy update events intp new slices
events := make([]*Event, len(a.Events)) events := make([]*table.Event, len(a.Events))
copy(events, a.Events) copy(events, a.Events)
// sort events by timestamp // sort events by timestamp
sort.Slice(events, func(i, j int) bool { sort.Slice(events, func(i, j int) bool {
@ -510,13 +514,13 @@ func (r *router) Update(a *Advert) error {
for _, event := range events { for _, event := range events {
// we extract the route from advertisement and update the routing table // we extract the route from advertisement and update the routing table
route := Route{ route := table.Route{
Destination: event.Route.Destination, Destination: event.Route.Destination,
Gateway: event.Route.Gateway, Gateway: event.Route.Gateway,
Router: event.Route.Router, Router: event.Route.Router,
Network: event.Route.Network, Network: event.Route.Network,
Metric: event.Route.Metric, Metric: event.Route.Metric,
Policy: Insert, Policy: table.Insert,
} }
if err := r.opts.Table.Update(route); err != nil { if err := r.opts.Table.Update(route); err != nil {
return fmt.Errorf("failed updating routing table: %v", err) return fmt.Errorf("failed updating routing table: %v", err)

View File

@ -2,6 +2,7 @@ package router
import ( import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
) )
@ -25,7 +26,7 @@ type Options struct {
// Registry is the local registry // Registry is the local registry
Registry registry.Registry Registry registry.Registry
// Table is routing table // Table is routing table
Table Table Table table.Table
} }
// ID sets Router ID // ID sets Router ID
@ -57,7 +58,7 @@ func Gateway(g string) Option {
} }
// RoutingTable sets the routing table // RoutingTable sets the routing table
func RoutingTable(t Table) Option { func RoutingTable(t table.Table) Option {
return func(o *Options) { return func(o *Options) {
o.Table = t o.Table = t
} }
@ -77,6 +78,6 @@ func DefaultOptions() Options {
Address: DefaultAddress, Address: DefaultAddress,
Network: DefaultNetwork, Network: DefaultNetwork,
Registry: registry.DefaultRegistry, Registry: registry.DefaultRegistry,
Table: NewTable(), Table: table.NewTable(),
} }
} }

View File

@ -1,7 +1,11 @@
// Package router provides a network routing control plane // Package router provides a network routing control plane
package router package router
import "time" import (
"time"
"github.com/micro/go-micro/network/router/table"
)
var ( var (
// DefaultRouter is default network router // DefaultRouter is default network router
@ -17,7 +21,7 @@ type Router interface {
// ID returns the ID of the router // ID returns the ID of the router
ID() string ID() string
// Table returns the routing table // Table returns the routing table
Table() Table Table() table.Table
// Address returns the router adddress // Address returns the router adddress
Address() string Address() string
// Network returns the network address of the router // Network returns the network address of the router
@ -65,8 +69,8 @@ type Advert struct {
ID string ID string
// Timestamp marks the time when the update is sent // Timestamp marks the time when the update is sent
Timestamp time.Time Timestamp time.Time
// Events is a list of events to advertise // Events is a list of routing table events to advertise
Events []*Event Events []*table.Event
} }
// StatusCode defines router status // StatusCode defines router status

View File

@ -1,4 +1,4 @@
package router package table
import ( import (
"fmt" "fmt"
@ -67,21 +67,21 @@ func (t *table) Add(r Route) error {
if _, ok := t.m[destAddr]; !ok { if _, ok := t.m[destAddr]; !ok {
t.m[destAddr] = make(map[uint64]Route) t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r}) go t.sendEvent(&Event{Type: Create, Route: r})
return nil return nil
} }
// add new route to the table for the route destination // add new route to the table for the route destination
if _, ok := t.m[destAddr][sum]; !ok { if _, ok := t.m[destAddr][sum]; !ok {
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r}) go t.sendEvent(&Event{Type: Create, Route: r})
return nil return nil
} }
// only add the route if the route override is explicitly requested // only add the route if the route override is explicitly requested
if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override { if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override {
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) go t.sendEvent(&Event{Type: Update, Route: r})
return nil return nil
} }
@ -107,7 +107,7 @@ func (t *table) Delete(r Route) error {
} }
delete(t.m[destAddr], sum) delete(t.m[destAddr], sum)
go t.sendEvent(&Event{Type: DeleteEvent, Route: r}) go t.sendEvent(&Event{Type: Delete, Route: r})
return nil return nil
} }
@ -125,7 +125,7 @@ func (t *table) Update(r Route) error {
if r.Policy == Insert { if r.Policy == Insert {
t.m[destAddr] = make(map[uint64]Route) t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r}) go t.sendEvent(&Event{Type: Create, Route: r})
return nil return nil
} }
return ErrRouteNotFound return ErrRouteNotFound
@ -135,14 +135,14 @@ func (t *table) Update(r Route) error {
// NOTE: We only insert the route if explicitly requested by the client // NOTE: We only insert the route if explicitly requested by the client
if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert { if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert {
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r}) go t.sendEvent(&Event{Type: Create, Route: r})
return nil return nil
} }
// if the route has been found update it // if the route has been found update it
if _, ok := t.m[destAddr][sum]; ok { if _, ok := t.m[destAddr][sum]; ok {
t.m[destAddr][sum] = r t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) go t.sendEvent(&Event{Type: Update, Route: r})
return nil return nil
} }

View File

@ -1,4 +1,4 @@
package router package table
import "testing" import "testing"

View File

@ -1,4 +1,4 @@
package router package table
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package router package table
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package router package table
import ( import (
"errors" "errors"

View File

@ -1,4 +1,4 @@
package router package table
import ( import (
"errors" "errors"
@ -19,22 +19,22 @@ var (
type EventType int type EventType int
const ( const (
// CreateEvent is emitted when a new route has been created // Create is emitted when a new route has been created
CreateEvent EventType = iota Create EventType = iota
// DeleteEvent is emitted when an existing route has been deleted // Delete is emitted when an existing route has been deleted
DeleteEvent Delete
// UpdateEvent is emitted when an existing route has been updated // Update is emitted when an existing route has been updated
UpdateEvent Update
) )
// String returns string representation of the event // String returns string representation of the event
func (et EventType) String() string { func (et EventType) String() string {
switch et { switch et {
case CreateEvent: case Create:
return "CREATE" return "CREATE"
case DeleteEvent: case Delete:
return "DELETE" return "DELETE"
case UpdateEvent: case Update:
return "UPDATE" return "UPDATE"
default: default:
return "UNKNOWN" return "UNKNOWN"
@ -53,7 +53,7 @@ type Event struct {
// String prints human readable Event // String prints human readable Event
func (e Event) String() string { func (e Event) String() string {
return fmt.Sprintf("[EVENT] Type: %s\nRoute:\n%s", e.Type, e.Route) return fmt.Sprintf("[EVENT] %s:\nRoute:\n%s", e.Type, e.Route)
} }
// WatchOption is used to define what routes to watch in the table // WatchOption is used to define what routes to watch in the table