Merge pull request #573 from milosgajdos83/flap-detection

Router rework. Flap detection. Table package.
This commit is contained in:
Asim Aslam 2019-07-10 07:12:18 +01:00 committed by GitHub
commit 34967e8e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1066 additions and 1025 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/network/router" "github.com/micro/go-micro/network/router"
pb "github.com/micro/go-micro/network/router/proto" pb "github.com/micro/go-micro/network/router/proto"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
) )
@ -40,11 +41,11 @@ type clientKey struct{}
type routerKey struct{} type routerKey struct{}
// getRoutes returns the routes whether they are remote or local // getRoutes returns the routes whether they are remote or local
func (r *routerSelector) getRoutes(service string) ([]router.Route, error) { func (r *routerSelector) getRoutes(service string) ([]table.Route, error) {
if !r.remote { if !r.remote {
// lookup router for routes for the service // lookup router for routes for the service
return r.r.Table().Lookup(router.NewQuery( return r.r.Table().Lookup(table.NewQuery(
router.QueryDestination(service), table.QueryService(service),
)) ))
} }
@ -82,7 +83,7 @@ func (r *routerSelector) getRoutes(service string) ([]router.Route, error) {
// call the router // call the router
pbRoutes, err = r.rs.Lookup(context.Background(), &pb.LookupRequest{ pbRoutes, err = r.rs.Lookup(context.Background(), &pb.LookupRequest{
Query: &pb.Query{ Query: &pb.Query{
Destination: service, Service: service,
}, },
}, client.WithAddress(addr)) }, client.WithAddress(addr))
if err != nil { if err != nil {
@ -101,16 +102,17 @@ func (r *routerSelector) getRoutes(service string) ([]router.Route, error) {
return nil, selector.ErrNoneAvailable return nil, selector.ErrNoneAvailable
} }
var routes []router.Route var routes []table.Route
// convert from pb to []*router.Route // convert from pb to []*router.Route
for _, r := range pbRoutes.Routes { for _, r := range pbRoutes.Routes {
routes = append(routes, router.Route{ routes = append(routes, table.Route{
Destination: r.Destination, Service: r.Service,
Gateway: r.Gateway, Address: r.Address,
Router: r.Router, Gateway: r.Gateway,
Network: r.Network, Network: r.Network,
Metric: int(r.Metric), Link: r.Link,
Metric: int(r.Metric),
}) })
} }

View File

@ -18,6 +18,7 @@ import (
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
pb "github.com/micro/go-micro/network/router/proto" pb "github.com/micro/go-micro/network/router/proto"
"github.com/micro/go-micro/network/router/table"
) )
// Proxy will transparently proxy requests to an endpoint. // Proxy will transparently proxy requests to an endpoint.
@ -40,7 +41,7 @@ type Proxy struct {
// A fib of routes service:address // A fib of routes service:address
sync.RWMutex sync.RWMutex
Routes map[string][]router.Route Routes map[string][]table.Route
} }
// read client request and write to server // read client request and write to server
@ -80,7 +81,7 @@ func readLoop(r server.Request, s client.Stream) error {
func (p *Proxy) getRoute(service string) ([]string, error) { func (p *Proxy) getRoute(service string) ([]string, error) {
// converts routes to just addresses // converts routes to just addresses
toNodes := func(routes []router.Route) []string { toNodes := func(routes []table.Route) []string {
var nodes []string var nodes []string
for _, node := range routes { for _, node := range routes {
nodes = append(nodes, node.Gateway) nodes = append(nodes, node.Gateway)
@ -106,7 +107,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
if p.Router != nil { if p.Router != nil {
// lookup the router // lookup the router
routes, err := p.Router.Table().Lookup( routes, err := p.Router.Table().Lookup(
router.NewQuery(router.QueryDestination(service)), table.NewQuery(table.QueryService(service)),
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -114,7 +115,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
p.Lock() p.Lock()
if p.Routes == nil { if p.Routes == nil {
p.Routes = make(map[string][]router.Route) p.Routes = make(map[string][]table.Route)
} }
p.Routes[service] = routes p.Routes[service] = routes
p.Unlock() p.Unlock()
@ -179,7 +180,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
// call the router // call the router
proutes, err := p.RouterService.Lookup(context.Background(), &pb.LookupRequest{ proutes, err := p.RouterService.Lookup(context.Background(), &pb.LookupRequest{
Query: &pb.Query{ Query: &pb.Query{
Destination: service, Service: service,
}, },
}, client.WithAddress(addr)) }, client.WithAddress(addr))
if err != nil { if err != nil {
@ -203,12 +204,13 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
// convert from pb to []*router.Route // convert from pb to []*router.Route
for _, r := range pbRoutes.Routes { for _, r := range pbRoutes.Routes {
routes = append(routes, router.Route{ routes = append(routes, table.Route{
Destination: r.Destination, Service: r.Service,
Gateway: r.Gateway, Address: r.Address,
Router: r.Router, Gateway: r.Gateway,
Network: r.Network, Network: r.Network,
Metric: int(r.Metric), Link: r.Link,
Metric: int(r.Metric),
}) })
} }

551
network/router/default.go Normal file
View File

@ -0,0 +1,551 @@
package router
import (
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry"
)
const (
// AdvertiseTick is time interval in which we advertise route updates
AdvertiseTick = 5 * time.Second
// AdvertSuppress is advert suppression threshold
AdvertSuppress = 2000
// AdvertRecover is advert recovery threshold
AdvertRecover = 750
// DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = time.Minute
// PenaltyDecay is the penalty decay
PenaltyDecay = 1.15
// Delete penalises route addition and deletion
Delete = 1000
// UpdatePenalty penalises route updates
UpdatePenalty = 500
)
// router provides default router implementation
type router struct {
opts Options
status Status
exit chan struct{}
eventChan chan *table.Event
advertChan chan *Advert
advertWg *sync.WaitGroup
wg *sync.WaitGroup
sync.RWMutex
}
// newRouter creates a new router and returns it
func newRouter(opts ...Option) Router {
// get default options
options := DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
return &router{
opts: options,
status: Status{Error: nil, Code: Stopped},
exit: make(chan struct{}),
eventChan: make(chan *table.Event),
advertChan: make(chan *Advert),
advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
}
}
// Init initializes router with given options
func (r *router) Init(opts ...Option) error {
for _, o := range opts {
o(&r.opts)
}
return nil
}
// Options returns router options
func (r *router) Options() Options {
return r.opts
}
// ID returns router ID
func (r *router) ID() string {
return r.opts.ID
}
// Table returns routing table
func (r *router) Table() table.Table {
return r.opts.Table
}
// Address returns router's bind address
func (r *router) Address() string {
return r.opts.Address
}
// Network returns the address router advertises to the network
func (r *router) Network() string {
return r.opts.Network
}
// manageServiceRoutes manages routes for a given service.
// It returns error of the routing table action fails.
func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
// action is the routing table action
action = strings.ToLower(action)
// take route action on each service node
for _, node := range service.Nodes {
route := table.Route{
Service: service.Name,
Address: node.Address,
Gateway: "",
Network: r.opts.Network,
Link: table.DefaultLink,
Metric: table.DefaultLocalMetric,
}
switch action {
case "insert", "create":
if err := r.opts.Table.Add(route); err != nil && err != table.ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", service.Name, err)
}
case "delete":
if err := r.opts.Table.Delete(route); err != nil && err != table.ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %v: %s", service.Name, err)
}
default:
return fmt.Errorf("failed to manage route for service %v. Unknown action: %s", service.Name, action)
}
}
return nil
}
// manageRegistryRoutes manages routes for each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails.
func (r *router) manageRegistryRoutes(reg registry.Registry, action string) error {
services, err := reg.ListServices()
if err != nil {
return fmt.Errorf("failed listing services: %v", err)
}
// add each service node as a separate route
for _, service := range services {
// get the service to retrieve all its info
srvs, err := reg.GetService(service.Name)
if err != nil {
continue
}
// manage the routes for all returned services
for _, s := range srvs {
if err := r.manageServiceRoutes(s, action); err != nil {
return err
}
}
}
return nil
}
// watchServices watches services in given registry and updates the routing table accordingly.
// It returns error if the service registry watcher stops or if the routing table can't be updated.
func (r *router) watchServices(w registry.Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-r.exit
w.Stop()
}()
var watchErr error
for {
res, err := w.Next()
if err != nil {
if err != registry.ErrWatcherStopped {
watchErr = err
}
break
}
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil {
return err
}
}
return watchErr
}
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w table.Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-r.exit
w.Stop()
}()
var watchErr error
for {
event, err := w.Next()
if err != nil {
if err != table.ErrWatcherStopped {
watchErr = err
}
break
}
select {
case <-r.exit:
close(r.eventChan)
return nil
case r.eventChan <- event:
}
}
// close event channel on error
close(r.eventChan)
return watchErr
}
func (r *router) advertEvents(advType AdvertType, events []*table.Event) {
defer r.advertWg.Done()
a := &Advert{
ID: r.ID(),
Type: advType,
Timestamp: time.Now(),
Events: events,
}
select {
case r.advertChan <- a:
case <-r.exit:
return
}
}
// isFlapping detects if the event is flapping based on the current and previous event status.
func isFlapping(curr, prev *table.Event) bool {
if curr.Type == table.Update && prev.Type == table.Update {
return true
}
if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert {
return true
}
return false
}
// updateEvent is a table event enriched with advertisement data
type updateEvent struct {
*table.Event
// timestamp marks the time the event has been received
timestamp time.Time
// penalty is current event penalty
penalty float64
// isSuppressed flags if the event should be considered for flap detection
isSuppressed bool
// isFlapping marks the event as flapping event
isFlapping bool
}
// processEvents processes routing table events.
// It suppresses unhealthy flapping events and advertises healthy events upstream.
func (r *router) processEvents() error {
// ticker to periodically scan event for advertising
ticker := time.NewTicker(AdvertiseTick)
// eventMap is a map of advert events
eventMap := make(map[uint64]*updateEvent)
for {
select {
case <-ticker.C:
var events []*table.Event
// collect all events which are not flapping
for key, event := range eventMap {
if !event.isFlapping && !event.isSuppressed {
e := new(table.Event)
*e = *event.Event
events = append(events, e)
// this deletes the advertised event from the map
delete(eventMap, key)
}
}
if len(events) > 0 {
r.advertWg.Add(1)
go r.advertEvents(Update, events)
}
case e := <-r.eventChan:
// event timestamp
now := time.Now()
// if event is nil, continue
if e == nil {
continue
}
// determine the event penalty
var penalty float64
switch e.Type {
case table.Update:
penalty = UpdatePenalty
case table.Delete:
penalty = Delete
}
// we use route hash as eventMap key
hash := e.Route.Hash()
event, ok := eventMap[hash]
if !ok {
event = &updateEvent{
Event: e,
penalty: penalty,
timestamp: time.Now(),
}
eventMap[hash] = event
continue
}
// update penalty for existing event: decay existing and add new penalty
delta := time.Since(event.timestamp).Seconds()
event.penalty = event.penalty*math.Exp(-delta) + penalty
event.timestamp = now
// suppress or recover the event based on its current penalty
if !event.isSuppressed && event.penalty > AdvertSuppress {
event.isSuppressed = true
} else if event.penalty < AdvertRecover {
event.isSuppressed = false
}
// if not suppressed decide if if its flapping
if !event.isSuppressed {
// detect if its flapping by comparing current and previous event
event.isFlapping = isFlapping(e, event.Event)
}
case <-r.exit:
// first wait for the advertiser to finish
r.advertWg.Wait()
// close the advert channel
close(r.advertChan)
return nil
}
}
// we probably never reach this place
return nil
}
// watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors(errChan <-chan error) {
defer r.wg.Done()
var code StatusCode
var err error
select {
case <-r.exit:
code = Stopped
case err = <-errChan:
code = Error
}
r.Lock()
defer r.Unlock()
status := Status{
Code: code,
Error: err,
}
r.status = status
// stop the router if some error happened
if err != nil && code != Stopped {
// this will stop watchers which will close r.advertChan
close(r.exit)
// drain the advertise channel
for range r.advertChan {
}
// drain the event channel
for range r.eventChan {
}
}
}
// Advertise advertises the routes to the network.
// It returns error if any of the launched goroutines fail with error.
func (r *router) Advertise() (<-chan *Advert, error) {
r.Lock()
defer r.Unlock()
if r.status.Code != Running {
// add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.opts.Registry, "insert"); err != nil {
return nil, fmt.Errorf("failed adding routes: %s", err)
}
// list routing table routes to announce
routes, err := r.opts.Table.List()
if err != nil {
return nil, fmt.Errorf("failed listing routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*table.Event, len(routes))
for i, route := range routes {
event := &table.Event{
Type: table.Insert,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
}
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := table.Route{
Service: "*",
Address: "*",
Gateway: r.opts.Gateway,
Network: "*",
Metric: table.DefaultLocalMetric,
}
if err := r.opts.Table.Add(route); err != nil {
return nil, fmt.Errorf("failed adding default gateway route: %s", err)
}
}
// NOTE: we only need to recreate these if the router errored or was stopped
// TODO: These probably dont need to be struct members
if r.status.Code == Error || r.status.Code == Stopped {
r.exit = make(chan struct{})
r.eventChan = make(chan *table.Event)
r.advertChan = make(chan *Advert)
}
// routing table watcher
tableWatcher, err := r.opts.Table.Watch()
if err != nil {
return nil, fmt.Errorf("failed creating routing table watcher: %v", err)
}
// service registry watcher
svcWatcher, err := r.opts.Registry.Watch()
if err != nil {
return nil, fmt.Errorf("failed creating service registry watcher: %v", err)
}
// error channel collecting goroutine errors
errChan := make(chan error, 3)
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routine table
errChan <- r.watchServices(svcWatcher)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routing table
errChan <- r.watchTable(tableWatcher)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// listen to routing table events and process them
errChan <- r.processEvents()
}()
// watch for errors and cleanup
r.wg.Add(1)
go r.watchErrors(errChan)
// advertise your presence
r.advertWg.Add(1)
go r.advertEvents(Announce, events)
// mark router as running and set its Error to nil
status := Status{
Code: Running,
Error: nil,
}
r.status = status
}
return r.advertChan, nil
}
// Update updates the routing table using the advertised values
func (r *router) Update(a *Advert) error {
// NOTE: event sorting might not be necessary
// copy update events intp new slices
events := make([]*table.Event, len(a.Events))
copy(events, a.Events)
// sort events by timestamp
sort.Slice(events, func(i, j int) bool {
return events[i].Timestamp.Before(events[j].Timestamp)
})
for _, event := range events {
// create a copy of the route
route := event.Route
if err := r.opts.Table.Update(route); err != nil {
return fmt.Errorf("failed updating routing table: %v", err)
}
}
return nil
}
// Status returns router status
func (r *router) Status() Status {
r.RLock()
defer r.RUnlock()
// make a copy of the status
status := r.status
return status
}
// Stop stops the router
func (r *router) Stop() error {
r.RLock()
// only close the channel if the router is running
if r.status.Code == Running {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel
for range r.advertChan {
}
// drain the event channel
for range r.eventChan {
}
}
r.RUnlock()
// wait for all goroutines to finish
r.wg.Wait()
return nil
}
// String prints debugging information about router
func (r router) String() string {
return "router"
}

View File

@ -1,378 +0,0 @@
package router
import (
"fmt"
"strings"
"sync"
"time"
"github.com/micro/go-micro/registry"
"github.com/olekukonko/tablewriter"
)
// router provides default router implementation
type router struct {
opts Options
status Status
advertChan chan *Update
exit chan struct{}
wg *sync.WaitGroup
sync.RWMutex
}
// newRouter creates new router and returns it
func newRouter(opts ...Option) Router {
// get default options
options := DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
return &router{
opts: options,
status: Status{Error: nil, Code: Init},
advertChan: make(chan *Update),
exit: make(chan struct{}),
wg: &sync.WaitGroup{},
}
}
// Init initializes router with given options
func (r *router) Init(opts ...Option) error {
for _, o := range opts {
o(&r.opts)
}
return nil
}
// Options returns router options
func (r *router) Options() Options {
return r.opts
}
// ID returns router ID
func (r *router) ID() string {
return r.opts.ID
}
// Table returns routing table
func (r *router) Table() Table {
return r.opts.Table
}
// Address returns router's bind address
func (r *router) Address() string {
return r.opts.Address
}
// Network returns the address router advertises to the network
func (r *router) Network() string {
return r.opts.Network
}
// addServiceRoutes adds all services in given registry to the routing table.
// NOTE: this is a one-off operation done when bootstrapping the routing table
// It returns error if either the services failed to be listed or
// if any of the the routes could not be added to the routing table.
func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error {
services, err := reg.ListServices()
if err != nil {
return fmt.Errorf("failed to list services: %v", err)
}
// add each service node as a separate route
for _, service := range services {
// get the service to retrieve all its info
srvs, err := reg.GetService(service.Name)
if err != nil {
continue
}
// create a flat slide of nodes
var nodes []*registry.Node
for _, s := range srvs {
nodes = append(nodes, s.Nodes...)
}
// range over the flat slice of nodes
for _, node := range nodes {
route := Route{
Destination: service.Name,
Gateway: node.Address,
Router: r.opts.Address,
Network: r.opts.Network,
Metric: metric,
}
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("error adding route for service %s: %s", service.Name, err)
}
}
}
return nil
}
// manageServiceRoutes watches services in given registry and updates the routing table accordingly.
// It returns error if the service registry watcher has stopped or if the routing table failed to be updated.
func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-r.exit
w.Stop()
}()
var watchErr error
for {
res, err := w.Next()
if err != nil {
if err != registry.ErrWatcherStopped {
watchErr = err
}
break
}
route := Route{
Destination: res.Service.Name,
Router: r.opts.Address,
Network: r.opts.Network,
Metric: metric,
}
switch res.Action {
case "create":
// only return error if the route is not duplicate, but something else has failed
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed to add route for service %v: %s", res.Service.Name, err)
}
case "delete":
// only return error if the route is not in the table, but something else has failed
if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to delete route for service %v: %s", res.Service.Name, err)
}
}
}
return watchErr
}
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-r.exit
w.Stop()
}()
var watchErr error
exit:
for {
event, err := w.Next()
if err != nil {
if err != ErrWatcherStopped {
watchErr = err
}
break
}
u := &Update{
ID: r.ID(),
Timestamp: time.Now(),
Event: event,
}
select {
case <-r.exit:
break exit
case r.advertChan <- u:
}
}
// close the advertisement channel
close(r.advertChan)
return watchErr
}
// watchError watches router errors
func (r *router) watchError(errChan <-chan error) {
defer r.wg.Done()
var code StatusCode
var err error
select {
case <-r.exit:
code = Stopped
case err = <-errChan:
code = Error
}
r.Lock()
defer r.Unlock()
status := Status{
Code: code,
Error: err,
}
r.status = status
// stop the router if some error happened
if err != nil && code != Stopped {
// this will stop watchers which will close r.advertChan
close(r.exit)
// drain the advertise channel
for range r.advertChan {
}
}
}
// Advertise advertises the routes to the network.
// It returns error if any of the launched goroutines fail with error.
func (r *router) Advertise() (<-chan *Update, error) {
r.Lock()
defer r.Unlock()
if r.status.Code != Running {
// add local service routes into the routing table
if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil {
return nil, fmt.Errorf("failed adding routes: %v", err)
}
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := Route{
Destination: "*",
Gateway: r.opts.Gateway,
Router: "*",
Network: "*",
Metric: DefaultLocalMetric,
}
if err := r.opts.Table.Add(route); err != nil {
return nil, fmt.Errorf("error to add default gateway route: %s", err)
}
}
// NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped
if r.status.Code == Error || r.status.Code == Stopped {
r.exit = make(chan struct{})
r.advertChan = make(chan *Update)
}
// routing table watcher which watches all routes i.e. to every destination
tableWatcher, err := r.opts.Table.Watch(WatchDestination("*"))
if err != nil {
return nil, fmt.Errorf("failed to create routing table watcher: %v", err)
}
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
return nil, fmt.Errorf("failed to create registry watcher: %v", err)
}
// error channel collecting goroutine errors
errChan := make(chan error, 2)
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routine table
errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routing table
errChan <- r.watchTable(tableWatcher)
}()
r.wg.Add(1)
go r.watchError(errChan)
// mark router as running and set its Error to nil
status := Status{
Code: Running,
Error: nil,
}
r.status = status
}
return r.advertChan, nil
}
// Update updates the routing table using the advertised values
func (r *router) Update(a *Update) error {
// we extract the route from advertisement and update the routing table
route := Route{
Destination: a.Event.Route.Destination,
Gateway: a.Event.Route.Gateway,
Router: a.Event.Route.Router,
Network: a.Event.Route.Network,
Metric: a.Event.Route.Metric,
Policy: AddIfNotExists,
}
return r.opts.Table.Update(route)
}
// Status returns router status
func (r *router) Status() Status {
r.RLock()
defer r.RUnlock()
// make a copy of the status
status := r.status
return status
}
// Stop stops the router
func (r *router) Stop() error {
r.RLock()
// only close the channel if the router is running
if r.status.Code == Running {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel
for range r.advertChan {
}
}
r.RUnlock()
// wait for all goroutines to finish
r.wg.Wait()
return nil
}
// String prints debugging information about router
func (r *router) String() string {
sb := &strings.Builder{}
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"ID", "Address", "Network", "Table", "Status"})
data := []string{
r.opts.ID,
r.opts.Address,
r.opts.Network,
fmt.Sprintf("%d", r.opts.Table.Size()),
r.status.Code.String(),
}
table.Append(data)
// render table into sb
table.Render()
return sb.String()
}

View File

@ -1,185 +0,0 @@
package router
import "testing"
// creates routing table and test route
func testSetup() (Table, Route) {
table := NewTable()
route := Route{
Destination: "dest.svc",
Gateway: "dest.gw",
Router: "dest.router",
Network: "dest.network",
Metric: 10,
}
return table, route
}
func TestAdd(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
// adds new route for the original destination
route.Gateway = "dest.gw2"
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
// overrides an existing route
// NOTE: the size of the table should not change
route.Metric = 100
route.Policy = OverrideIfExists
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size())
}
// dont add new route if it already exists
// NOTE: The size of the table should not change
route.Policy = IgnoreIfExists
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size())
}
// adding the same route under AddIfNotExists policy must error
route.Policy = AddIfNotExists
if err := table.Add(route); err != ErrDuplicateRoute {
t.Errorf("error adding route. Expected error: %s, Given: %s", ErrDuplicateRoute, err)
}
}
func TestDelete(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
// should fail to delete non-existant route
prevDest := route.Destination
route.Destination = "randDest"
if err := table.Delete(route); err != ErrRouteNotFound {
t.Errorf("error deleting route. Expected error: %s, given: %s", ErrRouteNotFound, err)
}
// we should be able to delete the existing route
route.Destination = prevDest
if err := table.Delete(route); err != nil {
t.Errorf("error deleting route: %s", err)
}
testTableSize -= 1
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size())
}
}
func TestUpdate(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
// change the metric of the original route
// NOTE: this should NOT change the size of the table
route.Metric = 200
if err := table.Update(route); err != nil {
t.Errorf("error updating route: %s", err)
}
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size())
}
// NOTE: routing table routes on <destination, gateway, network>
// this should add a new route
route.Destination = "new.dest"
if err := table.Update(route); err != nil {
t.Errorf("error updating route: %s", err)
}
testTableSize += 1
// NOTE: default policy is AddIfNotExists so the new route will be added here
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size())
}
// NOTE: we are hashing routes on <destination, gateway, network>
// this should add a new route
route.Gateway = "new.gw"
if err := table.Update(route); err != nil {
t.Errorf("error updating route: %s", err)
}
testTableSize += 1
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size())
}
// this should NOT add a new route as we are setting the policy to IgnoreIfExists
route.Destination = "rand.dest"
route.Policy = IgnoreIfExists
if err := table.Update(route); err != ErrRouteNotFound {
t.Errorf("error updating route. Expected error: %s, given: %s", ErrRouteNotFound, err)
}
if table.Size() != 3 {
t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size())
}
}
func TestList(t *testing.T) {
table, route := testSetup()
dest := []string{"one.svc", "two.svc", "three.svc"}
for i := 0; i < len(dest); i++ {
route.Destination = dest[i]
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
}
routes, err := table.List()
if err != nil {
t.Errorf("error listing routes: %s", err)
}
if len(routes) != len(dest) {
t.Errorf("incorrect number of routes listed. Expected: %d, Given: %d", len(dest), len(routes))
}
if len(routes) != table.Size() {
t.Errorf("mismatch number of routes and table size. Routes: %d, Size: %d", len(routes), table.Size())
}
}

View File

@ -2,6 +2,7 @@ package router
import ( import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
) )
@ -9,7 +10,7 @@ var (
// DefaultAddress is default router address // DefaultAddress is default router address
DefaultAddress = ":9093" DefaultAddress = ":9093"
// DefaultNetwork is default micro network // DefaultNetwork is default micro network
DefaultNetwork = "local" DefaultNetwork = "go.micro"
) )
// Options are router options // Options are router options
@ -18,14 +19,14 @@ type Options struct {
ID string ID string
// Address is router address // Address is router address
Address string Address string
// Network is micro network
Network string
// Gateway is micro network gateway // Gateway is micro network gateway
Gateway string Gateway string
// Network is micro network
Network string
// Registry is the local registry // Registry is the local registry
Registry registry.Registry Registry registry.Registry
// Table is routing table // Table is routing table
Table Table Table table.Table
} }
// ID sets Router ID // ID sets Router ID
@ -42,13 +43,6 @@ func Address(a string) Option {
} }
} }
// Network sets router network
func Network(n string) Option {
return func(o *Options) {
o.Network = n
}
}
// Gateway sets network gateway // Gateway sets network gateway
func Gateway(g string) Option { func Gateway(g string) Option {
return func(o *Options) { return func(o *Options) {
@ -56,8 +50,15 @@ func Gateway(g string) Option {
} }
} }
// Network sets router network
func Network(n string) Option {
return func(o *Options) {
o.Network = n
}
}
// RoutingTable sets the routing table // RoutingTable sets the routing table
func RoutingTable(t Table) Option { func RoutingTable(t table.Table) Option {
return func(o *Options) { return func(o *Options) {
o.Table = t o.Table = t
} }
@ -77,6 +78,6 @@ func DefaultOptions() Options {
Address: DefaultAddress, Address: DefaultAddress,
Network: DefaultNetwork, Network: DefaultNetwork,
Registry: registry.DefaultRegistry, Registry: registry.DefaultRegistry,
Table: NewTable(), Table: table.NewTable(),
} }
} }

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-micro. DO NOT EDIT. // Code generated by protoc-gen-micro. DO NOT EDIT.
// source: go-micro/network/router/proto/router.proto // source: router.proto
package router package router

View File

@ -1,13 +1,11 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// source: go-micro/network/router/proto/router.proto // source: router.proto
package router package router
import ( import (
context "context"
fmt "fmt" fmt "fmt"
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math" math "math"
) )
@ -34,7 +32,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} }
func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (m *LookupRequest) String() string { return proto.CompactTextString(m) }
func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) ProtoMessage() {}
func (*LookupRequest) Descriptor() ([]byte, []int) { func (*LookupRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{0} return fileDescriptor_367072455c71aedc, []int{0}
} }
func (m *LookupRequest) XXX_Unmarshal(b []byte) error { func (m *LookupRequest) XXX_Unmarshal(b []byte) error {
@ -74,7 +72,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} }
func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (m *LookupResponse) String() string { return proto.CompactTextString(m) }
func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) ProtoMessage() {}
func (*LookupResponse) Descriptor() ([]byte, []int) { func (*LookupResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{1} return fileDescriptor_367072455c71aedc, []int{1}
} }
func (m *LookupResponse) XXX_Unmarshal(b []byte) error { func (m *LookupResponse) XXX_Unmarshal(b []byte) error {
@ -104,8 +102,8 @@ func (m *LookupResponse) GetRoutes() []*Route {
// Query is passed in a LookupRequest // Query is passed in a LookupRequest
type Query struct { type Query struct {
// destination to lookup // service to lookup
Destination string `protobuf:"bytes,1,opt,name=destination,proto3" json:"destination,omitempty"` Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -115,7 +113,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) } func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {} func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{2} return fileDescriptor_367072455c71aedc, []int{2}
} }
func (m *Query) XXX_Unmarshal(b []byte) error { func (m *Query) XXX_Unmarshal(b []byte) error {
@ -136,9 +134,9 @@ func (m *Query) XXX_DiscardUnknown() {
var xxx_messageInfo_Query proto.InternalMessageInfo var xxx_messageInfo_Query proto.InternalMessageInfo
func (m *Query) GetDestination() string { func (m *Query) GetService() string {
if m != nil { if m != nil {
return m.Destination return m.Service
} }
return "" return ""
} }
@ -146,15 +144,17 @@ func (m *Query) GetDestination() string {
// Route is a service route // Route is a service route
type Route struct { type Route struct {
// service for the route // service for the route
Destination string `protobuf:"bytes,1,opt,name=destination,proto3" json:"destination,omitempty"` Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
// the address that advertise this route
Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"`
// gateway as the next hop // gateway as the next hop
Gateway string `protobuf:"bytes,2,opt,name=gateway,proto3" json:"gateway,omitempty"` Gateway string `protobuf:"bytes,3,opt,name=gateway,proto3" json:"gateway,omitempty"`
// the router that advertise this route
Router string `protobuf:"bytes,3,opt,name=router,proto3" json:"router,omitempty"`
// the network for this destination // the network for this destination
Network string `protobuf:"bytes,4,opt,name=network,proto3" json:"network,omitempty"` Network string `protobuf:"bytes,4,opt,name=network,proto3" json:"network,omitempty"`
// the network link
Link string `protobuf:"bytes,5,opt,name=link,proto3" json:"link,omitempty"`
// the metric / score of this route // the metric / score of this route
Metric int64 `protobuf:"varint,5,opt,name=metric,proto3" json:"metric,omitempty"` Metric int64 `protobuf:"varint,6,opt,name=metric,proto3" json:"metric,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -164,7 +164,7 @@ func (m *Route) Reset() { *m = Route{} }
func (m *Route) String() string { return proto.CompactTextString(m) } func (m *Route) String() string { return proto.CompactTextString(m) }
func (*Route) ProtoMessage() {} func (*Route) ProtoMessage() {}
func (*Route) Descriptor() ([]byte, []int) { func (*Route) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{3} return fileDescriptor_367072455c71aedc, []int{3}
} }
func (m *Route) XXX_Unmarshal(b []byte) error { func (m *Route) XXX_Unmarshal(b []byte) error {
@ -185,9 +185,16 @@ func (m *Route) XXX_DiscardUnknown() {
var xxx_messageInfo_Route proto.InternalMessageInfo var xxx_messageInfo_Route proto.InternalMessageInfo
func (m *Route) GetDestination() string { func (m *Route) GetService() string {
if m != nil { if m != nil {
return m.Destination return m.Service
}
return ""
}
func (m *Route) GetAddress() string {
if m != nil {
return m.Address
} }
return "" return ""
} }
@ -199,16 +206,16 @@ func (m *Route) GetGateway() string {
return "" return ""
} }
func (m *Route) GetRouter() string { func (m *Route) GetNetwork() string {
if m != nil { if m != nil {
return m.Router return m.Network
} }
return "" return ""
} }
func (m *Route) GetNetwork() string { func (m *Route) GetLink() string {
if m != nil { if m != nil {
return m.Network return m.Link
} }
return "" return ""
} }
@ -227,98 +234,23 @@ func init() {
proto.RegisterType((*Route)(nil), "Route") proto.RegisterType((*Route)(nil), "Route")
} }
func init() { func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) }
proto.RegisterFile("go-micro/network/router/proto/router.proto", fileDescriptor_fc08514fc6dadd29)
} var fileDescriptor_367072455c71aedc = []byte{
// 238 bytes of a gzipped FileDescriptorProto
var fileDescriptor_fc08514fc6dadd29 = []byte{ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xc1, 0x4a, 0xc4, 0x30,
// 242 bytes of a gzipped FileDescriptorProto 0x10, 0x86, 0x8d, 0xdd, 0x46, 0x1c, 0x75, 0x85, 0x1c, 0x24, 0x88, 0x48, 0xcd, 0x69, 0x41, 0x2c,
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x90, 0xc1, 0x4a, 0xc3, 0x40, 0xb2, 0xe2, 0x5b, 0x78, 0x31, 0x6f, 0x50, 0x77, 0x07, 0x29, 0xd5, 0xa6, 0x3b, 0x49, 0x5c, 0xf6,
0x10, 0x86, 0x5d, 0x63, 0x56, 0x9c, 0x62, 0x85, 0x3d, 0xc8, 0x22, 0x22, 0x61, 0x4f, 0x55, 0x69, 0x59, 0x7c, 0x59, 0xc9, 0x24, 0x7b, 0xe8, 0xc1, 0x5b, 0xbf, 0xf9, 0x66, 0x7e, 0x9a, 0x1f, 0x2e,
0x22, 0x15, 0xdf, 0xc2, 0x8b, 0xfb, 0x06, 0xb1, 0x0e, 0x25, 0x94, 0x66, 0xd2, 0xdd, 0x09, 0xa5, 0xc9, 0xc5, 0x80, 0xd4, 0x4e, 0xe4, 0x82, 0x33, 0x4f, 0x70, 0xf5, 0xe6, 0xdc, 0x10, 0x27, 0x8b,
0x0f, 0xe1, 0x3b, 0x4b, 0x26, 0x5b, 0x30, 0xa7, 0x1e, 0xbf, 0x99, 0xf9, 0x7e, 0x76, 0x7f, 0x78, 0xbb, 0x88, 0x3e, 0xa8, 0x3b, 0xa8, 0x77, 0x11, 0xe9, 0xa0, 0x45, 0x23, 0x56, 0x17, 0x6b, 0xd9,
0xd9, 0xd0, 0x72, 0xd7, 0xac, 0x03, 0x55, 0x2d, 0xf2, 0x81, 0xc2, 0xb6, 0x0a, 0xd4, 0x33, 0x86, 0xbe, 0x27, 0xb2, 0x79, 0x68, 0x9e, 0x61, 0x79, 0x5c, 0xf7, 0x93, 0x1b, 0x3d, 0xaa, 0x7b, 0x90,
0xaa, 0x0b, 0xc4, 0x94, 0xa0, 0x14, 0x70, 0x4b, 0xb8, 0xfd, 0x24, 0xda, 0xf6, 0x9d, 0xc7, 0x7d, 0x1c, 0xe8, 0xb5, 0x68, 0x2a, 0x3e, 0xb0, 0x09, 0x6d, 0x99, 0x9a, 0x07, 0xa8, 0x39, 0x41, 0x69,
0x8f, 0x91, 0xcd, 0x23, 0xe4, 0xfb, 0x1e, 0xc3, 0xd1, 0xaa, 0x42, 0x2d, 0x66, 0x2b, 0x5d, 0x7e, 0x38, 0xf3, 0x48, 0x3f, 0xfd, 0x06, 0x39, 0xfa, 0xdc, 0x1e, 0xd1, 0xfc, 0x0a, 0xa8, 0xf9, 0xe8,
0x0d, 0xe4, 0xc7, 0xa1, 0x7b, 0x83, 0xf9, 0xe9, 0x3c, 0x76, 0xd4, 0x46, 0x34, 0x4f, 0xa0, 0x25, 0xff, 0x9d, 0x64, 0xba, 0xed, 0x96, 0xd0, 0x7b, 0x7d, 0x9a, 0x4d, 0xc1, 0x64, 0x3e, 0xbb, 0x80,
0x30, 0x5a, 0x55, 0x64, 0x22, 0xf8, 0x01, 0x7d, 0x9a, 0xba, 0x67, 0xc8, 0x25, 0xc1, 0x14, 0x30, 0xfb, 0xee, 0xa0, 0xab, 0x6c, 0x0a, 0x26, 0x33, 0x62, 0xd8, 0x3b, 0x1a, 0xf4, 0x22, 0x9b, 0x82,
0xfb, 0xc1, 0xc8, 0x4d, 0x5b, 0x73, 0x43, 0xad, 0xc4, 0xdf, 0xf8, 0xff, 0x23, 0xf7, 0xab, 0x20, 0x4a, 0xc1, 0xe2, 0xab, 0x1f, 0x07, 0x5d, 0xf3, 0x98, 0xbf, 0xd5, 0x0d, 0xc8, 0x6f, 0x0c, 0xd4,
0x17, 0xf9, 0xfc, 0xad, 0xb1, 0x70, 0xbd, 0xa9, 0x19, 0x0f, 0xf5, 0xd1, 0x5e, 0xca, 0xf6, 0x84, 0x6f, 0xb4, 0x6c, 0xc4, 0xaa, 0xb2, 0x85, 0xd6, 0xaf, 0x20, 0xf9, 0xe7, 0x48, 0x3d, 0x82, 0xcc,
0xe6, 0x3e, 0x3d, 0x28, 0xd8, 0x4c, 0x16, 0x89, 0x06, 0x23, 0xd5, 0x61, 0xaf, 0x46, 0x23, 0xe1, 0x8f, 0x57, 0xcb, 0x76, 0x56, 0xda, 0xed, 0x75, 0x3b, 0x6f, 0xc5, 0x9c, 0x7c, 0x48, 0xee, 0xf7,
0x60, 0xec, 0x90, 0x43, 0xb3, 0xb6, 0x79, 0xa1, 0x16, 0x99, 0x4f, 0xb4, 0xfa, 0x00, 0xed, 0x47, 0xe5, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x99, 0xfb, 0x2d, 0x6f, 0x01, 0x00, 0x00,
0xf7, 0x15, 0xf4, 0xf8, 0x6d, 0x33, 0x2f, 0x27, 0x75, 0x3d, 0xdc, 0x95, 0xd3, 0x3e, 0xdc, 0xc5,
0xb7, 0x96, 0x66, 0xdf, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x4d, 0x73, 0x18, 0x9e, 0x87, 0x01,
0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// RouterClient is the client API for Router service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RouterClient interface {
Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error)
}
type routerClient struct {
cc *grpc.ClientConn
}
func NewRouterClient(cc *grpc.ClientConn) RouterClient {
return &routerClient{cc}
}
func (c *routerClient) Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) {
out := new(LookupResponse)
err := c.cc.Invoke(ctx, "/Router/Lookup", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RouterServer is the server API for Router service.
type RouterServer interface {
Lookup(context.Context, *LookupRequest) (*LookupResponse, error)
}
func RegisterRouterServer(s *grpc.Server, srv RouterServer) {
s.RegisterService(&_Router_serviceDesc, srv)
}
func _Router_Lookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(LookupRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RouterServer).Lookup(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Router/Lookup",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RouterServer).Lookup(ctx, req.(*LookupRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Router_serviceDesc = grpc.ServiceDesc{
ServiceName: "Router",
HandlerType: (*RouterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Lookup",
Handler: _Router_Lookup_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "go-micro/network/router/proto/router.proto",
} }

View File

@ -17,20 +17,22 @@ message LookupResponse {
// Query is passed in a LookupRequest // Query is passed in a LookupRequest
message Query { message Query {
// destination to lookup // service to lookup
string destination = 1; string service = 1;
} }
// Route is a service route // Route is a service route
message Route { message Route {
// service for the route // service for the route
string destination = 1; string service = 1;
// the address that advertise this route
string address = 2;
// gateway as the next hop // gateway as the next hop
string gateway = 2; string gateway = 3;
// the router that advertise this route
string router = 3;
// the network for this destination // the network for this destination
string network = 4; string network = 4;
// the network link
string link = 5;
// the metric / score of this route // the metric / score of this route
int64 metric = 5; int64 metric = 6;
} }

View File

@ -1,81 +0,0 @@
package router
import (
"fmt"
"strings"
"github.com/olekukonko/tablewriter"
)
var (
// DefaultLocalMetric is default route cost for local network
DefaultLocalMetric = 1
// DefaultNetworkMetric is default route cost for micro network
DefaultNetworkMetric = 10
)
// RoutePolicy defines routing table addition policy
type RoutePolicy int
const (
// AddIfNotExist adds the route if it does not exist
AddIfNotExists RoutePolicy = iota
// OverrideIfExists overrides route if it already exists
OverrideIfExists
// IgnoreIfExists instructs to not modify existing route
IgnoreIfExists
)
// String returns human reprensentation of policy
func (p RoutePolicy) String() string {
switch p {
case AddIfNotExists:
return "ADD_IF_NOT_EXISTS"
case OverrideIfExists:
return "OVERRIDE_IF_EXISTS"
case IgnoreIfExists:
return "IGNORE_IF_EXISTS"
default:
return "UNKNOWN"
}
}
// Route is network route
type Route struct {
// Destination is destination address
Destination string
// Gateway is route gateway
Gateway string
// Router is the network router address
Router string
// Network is micro network address
Network string
// Metric is the route cost metric
Metric int
// Policy defines route policy
Policy RoutePolicy
}
// String allows to print the route
func (r *Route) String() string {
// this will help us build routing table string
sb := &strings.Builder{}
// create nice table printing structure
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"})
strRoute := []string{
r.Destination,
r.Gateway,
r.Router,
r.Network,
fmt.Sprintf("%d", r.Metric),
}
table.Append(strRoute)
// render table into sb
table.Render()
return sb.String()
}

View File

@ -1,7 +1,11 @@
// Package router provides a network routing control plane // Package router provides a network routing control plane
package router package router
import "time" import (
"time"
"github.com/micro/go-micro/network/router/table"
)
var ( var (
// DefaultRouter is default network router // DefaultRouter is default network router
@ -14,18 +18,18 @@ type Router interface {
Init(...Option) error Init(...Option) error
// Options returns the router options // Options returns the router options
Options() Options Options() Options
// ID returns the id of the router // ID returns the ID of the router
ID() string ID() string
// Table returns the routing table
Table() Table
// Address returns the router adddress // Address returns the router adddress
Address() string Address() string
// Network returns the network address of the router // Network returns the network address of the router
Network() string Network() string
// Advertise starts advertising routes to the network // Table returns the routing table
Advertise() (<-chan *Update, error) Table() table.Table
// Advertise advertises routes to the network
Advertise() (<-chan *Advert, error)
// Update updates the routing table // Update updates the routing table
Update(*Update) error Update(*Advert) error
// Status returns router status // Status returns router status
Status() Status Status() Status
// Stop stops the router // Stop stops the router
@ -34,14 +38,44 @@ type Router interface {
String() string String() string
} }
// Update is sent by the router to the network // Option used by the router
type Update struct { type Option func(*Options)
// AdvertType is route advertisement type
type AdvertType int
const (
// Announce is advertised when the router announces itself
Announce AdvertType = iota
// Update advertises route updates
Update
)
// String returns string representation of update event
func (at AdvertType) String() string {
switch at {
case Announce:
return "ANNOUNCE"
case Update:
return "UPDATE"
default:
return "UNKNOWN"
}
}
// Advert contains a list of events advertised by the router to the network
type Advert struct {
// ID is the router ID // ID is the router ID
ID string ID string
// Timestamp marks the time when update is sent // Type is type of advert
Type AdvertType
// Timestamp marks the time when the update is sent
Timestamp time.Time Timestamp time.Time
// Event defines advertisement even // TTL is Advert TTL
Event *Event // TODO: not used
TTL time.Time
// Events is a list of routing table events to advertise
Events []*table.Event
} }
// StatusCode defines router status // StatusCode defines router status
@ -56,35 +90,28 @@ type Status struct {
} }
const ( const (
// Init means the rotuer has just been initialized // Running means the router is up and running
Init StatusCode = iota Running StatusCode = iota
// Running means the router is running // Stopped means the router has been stopped
Running
// Error means the router has crashed with error
Error
// Stopped means the router has stopped
Stopped Stopped
// Error means the router has encountered error
Error
) )
// String returns human readable status code // String returns human readable status code
func (sc StatusCode) String() string { func (sc StatusCode) String() string {
switch sc { switch sc {
case Init:
return "INITIALIZED"
case Running: case Running:
return "RUNNING" return "RUNNING"
case Error:
return "ERROR"
case Stopped: case Stopped:
return "STOPPED" return "STOPPED"
case Error:
return "ERROR"
default: default:
return "UNKNOWN" return "UNKNOWN"
} }
} }
// Option used by the router
type Option func(*Options)
// NewRouter creates new Router and returns it // NewRouter creates new Router and returns it
func NewRouter(opts ...Option) Router { func NewRouter(opts ...Option) Router {
return newRouter(opts...) return newRouter(opts...)

View File

@ -1,34 +1,27 @@
package router package table
import ( import (
"fmt"
"hash"
"hash/fnv"
"strings"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/olekukonko/tablewriter"
) )
// TableOptions are routing table options // TableOptions specify routing table options
// TODO: table options TBD in the future // TODO: table options TBD in the future
type TableOptions struct{} type TableOptions struct{}
// table is in memory routing table // table is an in memory routing table
type table struct { type table struct {
// opts are table options // opts are table options
opts TableOptions opts TableOptions
// m stores routing table map // m stores routing table map
m map[string]map[uint64]Route m map[string]map[uint64]Route
// h hashes route entries
h hash.Hash64
// w is a list of table watchers // w is a list of table watchers
w map[string]*tableWatcher w map[string]*tableWatcher
sync.RWMutex sync.RWMutex
} }
// newTable creates in memory routing table and returns it // newTable creates a new routing table and returns it
func newTable(opts ...TableOption) Table { func newTable(opts ...TableOption) Table {
// default options // default options
var options TableOptions var options TableOptions
@ -38,14 +31,10 @@ func newTable(opts ...TableOption) Table {
o(&options) o(&options)
} }
h := fnv.New64()
h.Reset()
return &table{ return &table{
opts: options, opts: options,
m: make(map[string]map[uint64]Route), m: make(map[string]map[uint64]Route),
w: make(map[string]*tableWatcher), w: make(map[string]*tableWatcher),
h: h,
} }
} }
@ -64,37 +53,24 @@ func (t *table) Options() TableOptions {
// Add adds a route to the routing table // Add adds a route to the routing table
func (t *table) Add(r Route) error { func (t *table) Add(r Route) error {
destAddr := r.Destination service := r.Service
sum := t.hash(r) sum := r.Hash()
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
// check if the destination has any routes in the table // check if there are any routes in the table for the route destination
if _, ok := t.m[destAddr]; !ok { if _, ok := t.m[service]; !ok {
t.m[destAddr] = make(map[uint64]Route) t.m[service] = make(map[uint64]Route)
t.m[destAddr][sum] = r t.m[service][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r}) go t.sendEvent(&Event{Type: Insert, Route: r})
return nil return nil
} }
// add new route to the table for the given destination // add new route to the table for the route destination
if _, ok := t.m[destAddr][sum]; !ok { if _, ok := t.m[service][sum]; !ok {
t.m[destAddr][sum] = r t.m[service][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r}) go t.sendEvent(&Event{Type: Insert, Route: r})
return nil
}
// only add the route if the route override is explicitly requested
if _, ok := t.m[destAddr][sum]; ok && r.Policy == OverrideIfExists {
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: UpdateEvent, Route: r})
return nil
}
// if we reached this point without already returning the route already exists
// we return nil only if explicitly requested by the client
if r.Policy == IgnoreIfExists {
return nil return nil
} }
@ -103,51 +79,39 @@ func (t *table) Add(r Route) error {
// Delete deletes the route from the routing table // Delete deletes the route from the routing table
func (t *table) Delete(r Route) error { func (t *table) Delete(r Route) error {
service := r.Service
sum := r.Hash()
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
destAddr := r.Destination if _, ok := t.m[service]; !ok {
sum := t.hash(r)
if _, ok := t.m[destAddr]; !ok {
return ErrRouteNotFound return ErrRouteNotFound
} }
delete(t.m[destAddr], sum) delete(t.m[service], sum)
go t.sendEvent(&Event{Type: DeleteEvent, Route: r}) go t.sendEvent(&Event{Type: Delete, Route: r})
return nil return nil
} }
// Update updates routing table with new route // Update updates routing table with the new route
func (t *table) Update(r Route) error { func (t *table) Update(r Route) error {
destAddr := r.Destination service := r.Service
sum := t.hash(r) sum := r.Hash()
t.Lock() t.Lock()
defer t.Unlock() defer t.Unlock()
// check if the destAddr has ANY routes in the table // check if the route destination has any routes in the table
if _, ok := t.m[destAddr]; !ok { if _, ok := t.m[service]; !ok {
if r.Policy == AddIfNotExists {
t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r})
return nil
}
return ErrRouteNotFound return ErrRouteNotFound
} }
if _, ok := t.m[destAddr][sum]; !ok && r.Policy == AddIfNotExists {
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r})
return nil
}
// if the route has been found update it // if the route has been found update it
if _, ok := t.m[destAddr][sum]; ok { if _, ok := t.m[service][sum]; ok {
t.m[destAddr][sum] = r t.m[service][sum] = r
go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) go t.sendEvent(&Event{Type: Update, Route: r})
return nil return nil
} }
@ -172,7 +136,7 @@ func (t *table) List() ([]Route, error) {
// isMatch checks if the route matches given network and router // isMatch checks if the route matches given network and router
func isMatch(route Route, network, router string) bool { func isMatch(route Route, network, router string) bool {
if network == "*" || network == route.Network { if network == "*" || network == route.Network {
if router == "*" || router == route.Router { if router == "*" || router == route.Gateway {
return true return true
} }
} }
@ -195,18 +159,18 @@ func (t *table) Lookup(q Query) ([]Route, error) {
t.RLock() t.RLock()
defer t.RUnlock() defer t.RUnlock()
if q.Options().Destination != "*" { if q.Options().Service != "*" {
// no routes found for the destination and query policy is not a DiscardIfNone // no routes found for the destination and query policy is not a DiscardIfNone
if _, ok := t.m[q.Options().Destination]; !ok && q.Options().Policy != DiscardIfNone { if _, ok := t.m[q.Options().Service]; !ok && q.Options().Policy != DiscardIfNone {
return nil, ErrRouteNotFound return nil, ErrRouteNotFound
} }
return findRoutes(t.m[q.Options().Destination], q.Options().Network, q.Options().Router), nil return findRoutes(t.m[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
} }
var results []Route var results []Route
// search through all destinations // search through all destinations
for _, routes := range t.m { for _, routes := range t.m {
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Router)...) results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...)
} }
return results, nil return results, nil
@ -216,7 +180,7 @@ func (t *table) Lookup(q Query) ([]Route, error) {
func (t *table) Watch(opts ...WatchOption) (Watcher, error) { func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
// by default watch everything // by default watch everything
wopts := WatchOptions{ wopts := WatchOptions{
Destination: "*", Service: "*",
} }
for _, o := range opts { for _, o := range opts {
@ -263,40 +227,6 @@ func (t *table) Size() int {
} }
// String returns debug information // String returns debug information
func (t *table) String() string { func (t table) String() string {
t.RLock() return "table"
defer t.RUnlock()
// this will help us build routing table string
sb := &strings.Builder{}
// create nice table printing structure
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"})
for _, destRoute := range t.m {
for _, route := range destRoute {
strRoute := []string{
route.Destination,
route.Gateway,
route.Router,
route.Network,
fmt.Sprintf("%d", route.Metric),
}
table.Append(strRoute)
}
}
// render table into sb
table.Render()
return sb.String()
}
// hash hashes the route using router gateway and network address
func (t *table) hash(r Route) uint64 {
t.h.Reset()
t.h.Write([]byte(r.Destination + r.Gateway + r.Network))
return t.h.Sum64()
} }

View File

@ -0,0 +1,227 @@
package table
import "testing"
func testSetup() (Table, Route) {
table := NewTable()
route := Route{
Service: "dest.svc",
Gateway: "dest.gw",
Network: "dest.network",
Link: "det.link",
Metric: 10,
}
return table, route
}
func TestAdd(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
// adds new route for the original destination
route.Gateway = "dest.gw2"
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
// adding the same route under Insert policy must error
if err := table.Add(route); err != ErrDuplicateRoute {
t.Errorf("error adding route. Expected error: %s, found: %s", ErrDuplicateRoute, err)
}
}
func TestDelete(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
// should fail to delete non-existant route
prevSvc := route.Service
route.Service = "randDest"
if err := table.Delete(route); err != ErrRouteNotFound {
t.Errorf("error deleting route. Expected: %s, found: %s", ErrRouteNotFound, err)
}
// we should be able to delete the existing route
route.Service = prevSvc
if err := table.Delete(route); err != nil {
t.Errorf("error deleting route: %s", err)
}
testTableSize -= 1
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
}
func TestUpdate(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize += 1
// change the metric of the original route
route.Metric = 200
if err := table.Update(route); err != nil {
t.Errorf("error updating route: %s", err)
}
// the size of the table should not change as we're only updating the metric of an existing route
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
// this should error as the destination does not exist
route.Service = "rand.dest"
if err := table.Update(route); err != ErrRouteNotFound {
t.Errorf("error updating route. Expected error: %s, found: %s", ErrRouteNotFound, err)
}
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
}
func TestList(t *testing.T) {
table, route := testSetup()
svc := []string{"one.svc", "two.svc", "three.svc"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
}
routes, err := table.List()
if err != nil {
t.Errorf("error listing routes: %s", err)
}
if len(routes) != len(svc) {
t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes))
}
if len(routes) != table.Size() {
t.Errorf("mismatch number of routes and table size. Expected: %d, found: %d", len(routes), table.Size())
}
}
func TestLookup(t *testing.T) {
table, route := testSetup()
svc := []string{"svc1", "svc2", "svc3"}
net := []string{"net1", "net2", "net1"}
gw := []string{"gw1", "gw2", "gw3"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
route.Network = net[i]
route.Gateway = gw[i]
if err := table.Add(route); err != nil {
t.Errorf("error adding route: %s", err)
}
}
// return all routes
query := NewQuery()
routes, err := table.Lookup(query)
if err != nil {
t.Errorf("error looking up routes: %s", err)
}
if len(routes) != table.Size() {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", table.Size(), len(routes))
}
// query particular net
query = NewQuery(QueryNetwork("net1"))
routes, err = table.Lookup(query)
if err != nil {
t.Errorf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
}
// query particular gateway
gateway := "gw1"
query = NewQuery(QueryGateway(gateway))
routes, err = table.Lookup(query)
if err != nil {
t.Errorf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
// query particular route
network := "net1"
query = NewQuery(
QueryGateway(gateway),
QueryNetwork(network),
)
routes, err = table.Lookup(query)
if err != nil {
t.Errorf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
if routes[0].Network != network {
t.Errorf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
}
// bullshit route query
query = NewQuery(QueryService("foobar"))
routes, err = table.Lookup(query)
if err != nil {
t.Errorf("error looking up routes: %s", err)
}
if len(routes) != 0 {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
}
}

View File

@ -1,11 +1,4 @@
package router package table
import (
"fmt"
"strings"
"github.com/olekukonko/tablewriter"
)
// LookupPolicy defines query policy // LookupPolicy defines query policy
type LookupPolicy int type LookupPolicy int
@ -34,34 +27,34 @@ type QueryOption func(*QueryOptions)
// QueryOptions are routing table query options // QueryOptions are routing table query options
type QueryOptions struct { type QueryOptions struct {
// Destination is destination address // Service is destination service name
Destination string Service string
// Gateway is route gateway
Gateway string
// Network is network address // Network is network address
Network string Network string
// Router is router address
Router string
// Policy is query lookup policy // Policy is query lookup policy
Policy LookupPolicy Policy LookupPolicy
} }
// QueryDestination sets destination address // QueryService sets destination address
func QueryDestination(d string) QueryOption { func QueryService(s string) QueryOption {
return func(o *QueryOptions) { return func(o *QueryOptions) {
o.Destination = d o.Service = s
}
}
// QueryGateway sets route gateway
func QueryGateway(g string) QueryOption {
return func(o *QueryOptions) {
o.Gateway = g
} }
} }
// QueryNetwork sets route network address // QueryNetwork sets route network address
func QueryNetwork(a string) QueryOption { func QueryNetwork(n string) QueryOption {
return func(o *QueryOptions) { return func(o *QueryOptions) {
o.Network = a o.Network = n
}
}
// QueryRouter sets route router address
func QueryRouter(r string) QueryOption {
return func(o *QueryOptions) {
o.Router = r
} }
} }
@ -89,9 +82,10 @@ func NewQuery(opts ...QueryOption) Query {
// default options // default options
// NOTE: by default we use DefaultNetworkMetric // NOTE: by default we use DefaultNetworkMetric
qopts := QueryOptions{ qopts := QueryOptions{
Destination: "*", Service: "*",
Network: "*", Gateway: "*",
Policy: DiscardIfNone, Network: "*",
Policy: DiscardIfNone,
} }
for _, o := range opts { for _, o := range opts {
@ -110,23 +104,5 @@ func (q *query) Options() QueryOptions {
// String prints routing table query in human readable form // String prints routing table query in human readable form
func (q query) String() string { func (q query) String() string {
// this will help us build routing table string return "query"
sb := &strings.Builder{}
// create nice table printing structure
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"Destination", "Network", "Router", "Policy"})
strQuery := []string{
q.opts.Destination,
q.opts.Network,
q.opts.Router,
fmt.Sprintf("%s", q.opts.Policy),
}
table.Append(strQuery)
// render table into sb
table.Render()
return sb.String()
} }

View File

@ -0,0 +1,44 @@
package table
import (
"hash/fnv"
)
var (
// DefaultLink is default network link
DefaultLink = "local"
// DefaultLocalMetric is default route cost metric for the local network
DefaultLocalMetric = 1
// DefaultNetworkMetric is default route cost metric for the micro network
DefaultNetworkMetric = 10
)
// Route is network route
type Route struct {
// Service is destination service name
Service string
// Address is service node address
Address string
// Gateway is route gateway
Gateway string
// Network is network address
Network string
// Link is network link
Link string
// Metric is the route cost metric
Metric int
}
// Hash returns route hash sum.
func (r *Route) Hash() uint64 {
h := fnv.New64()
h.Reset()
h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Link))
return h.Sum64()
}
// String returns human readable route
func (r Route) String() string {
return "route"
}

View File

@ -1,4 +1,4 @@
package router package table
import ( import (
"errors" "errors"

View File

@ -1,10 +1,9 @@
package router package table
import ( import (
"errors" "errors"
"strings" "fmt"
"time"
"github.com/olekukonko/tablewriter"
) )
var ( var (
@ -16,22 +15,22 @@ var (
type EventType int type EventType int
const ( const (
// CreateEvent is emitted when new route has been created // Insert is emitted when a new route has been inserted
CreateEvent EventType = iota Insert EventType = iota
// DeleteEvent is emitted when an existing route has been deleted // Delete is emitted when an existing route has been deleted
DeleteEvent Delete
// UpdateEvent is emitted when a routing table has been updated // Update is emitted when an existing route has been updated
UpdateEvent Update
) )
// String returns string representation of the event // String returns string representation of the event
func (et EventType) String() string { func (et EventType) String() string {
switch et { switch et {
case CreateEvent: case Insert:
return "CREATE" return "INSERT"
case DeleteEvent: case Delete:
return "DELETE" return "DELETE"
case UpdateEvent: case Update:
return "UPDATE" return "UPDATE"
default: default:
return "UNKNOWN" return "UNKNOWN"
@ -42,10 +41,17 @@ func (et EventType) String() string {
type Event struct { type Event struct {
// Type defines type of event // Type defines type of event
Type EventType Type EventType
// Route is table rout // Timestamp is event timestamp
Timestamp time.Time
// Route is table route
Route Route Route Route
} }
// String prints human readable Event
func (e Event) String() string {
return fmt.Sprintf("[EVENT] time: %s type: %s", e.Timestamp, e.Type)
}
// WatchOption is used to define what routes to watch in the table // WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions) type WatchOption func(*WatchOptions)
@ -62,15 +68,15 @@ type Watcher interface {
// WatchOptions are table watcher options // WatchOptions are table watcher options
type WatchOptions struct { type WatchOptions struct {
// Specify destination address to watch // Service allows to watch specific service routes
Destination string Service string
} }
// WatchDestination sets what destination to watch // WatchService sets what service routes to watch
// Destination is usually microservice name // Service is the microservice name
func WatchDestination(d string) WatchOption { func WatchService(s string) WatchOption {
return func(o *WatchOptions) { return func(o *WatchOptions) {
o.Destination = d o.Service = s
} }
} }
@ -81,18 +87,16 @@ type tableWatcher struct {
} }
// Next returns the next noticed action taken on table // Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly; we only allow watching particular route destination for now // TODO: this needs to be thought through properly;
// right now we only allow to watch destination
func (w *tableWatcher) Next() (*Event, error) { func (w *tableWatcher) Next() (*Event, error) {
for { for {
select { select {
case res := <-w.resChan: case res := <-w.resChan:
switch w.opts.Destination { switch w.opts.Service {
case "*", "": case res.Route.Service, "*":
return res, nil return res, nil
default: default:
if w.opts.Destination == res.Route.Destination {
return res, nil
}
continue continue
} }
case <-w.done: case <-w.done:
@ -117,19 +121,6 @@ func (w *tableWatcher) Stop() {
} }
// String prints debug information // String prints debug information
func (w *tableWatcher) String() string { func (w tableWatcher) String() string {
sb := &strings.Builder{} return "watcher"
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"Destination"})
data := []string{
w.opts.Destination,
}
table.Append(data)
// render table into sb
table.Render()
return sb.String()
} }