Removed router watcher code duplication. Small code refactor.

This commit is contained in:
Milos Gajdos 2019-06-13 15:12:07 +01:00
parent 6e669d4611
commit 6a33b7576b
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
6 changed files with 42 additions and 94 deletions

View File

@ -7,7 +7,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/olekukonko/tablewriter" "github.com/olekukonko/tablewriter"
) )
@ -18,6 +17,7 @@ type router struct {
wg *sync.WaitGroup wg *sync.WaitGroup
} }
// newRouter creates new router and returns it
func newRouter(opts ...Option) Router { func newRouter(opts ...Option) Router {
// get default options // get default options
options := DefaultOptions() options := DefaultOptions()
@ -69,14 +69,14 @@ func (r *router) Network() string {
// Start starts the router // Start starts the router
func (r *router) Start() error { func (r *router) Start() error {
// add local service routes into routing table // add local service routes into the routing table
if err := r.addServiceRoutes(r.opts.LocalRegistry, "local", 1); err != nil { if err := r.addServiceRoutes(r.opts.LocalRegistry, "local", 1); err != nil {
return fmt.Errorf("failed to add service routes for local services: %v", err) return fmt.Errorf("failed adding routes for local services: %v", err)
} }
// add network service routes into routing table // add network service routes into the routing table
if err := r.addServiceRoutes(r.opts.NetworkRegistry, r.opts.NetworkAddress, 10); err != nil { if err := r.addServiceRoutes(r.opts.NetworkRegistry, r.opts.NetworkAddress, 10); err != nil {
return fmt.Errorf("failed to add service routes for network services: %v", err) return fmt.Errorf("failed adding routes for network services: %v", err)
} }
// lookup local service routes and advertise them in network registry // lookup local service routes and advertise them in network registry
@ -125,10 +125,10 @@ func (r *router) Start() error {
} }
r.wg.Add(1) r.wg.Add(1)
go r.watchLocal(lWatcher) go r.manageServiceRoutes(lWatcher, "local", DefaultLocalMetric)
r.wg.Add(1) r.wg.Add(1)
go r.watchRemote(rWatcher) go r.manageServiceRoutes(rWatcher, r.opts.NetworkAddress, DefaultNetworkMetric)
r.wg.Add(1) r.wg.Add(1)
go r.watchTable(tWatcher) go r.watchTable(tWatcher)
@ -136,23 +136,22 @@ func (r *router) Start() error {
return nil return nil
} }
// addServiceRouteslists all available services in given registry and adds them to the routing table.
// NOTE: this is a one-off operation done to bootstrap the rouing table of the new router when it starts.
// It returns error if the route could not be added to the routing table.
func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error {
// list all local services
services, err := reg.ListServices() services, err := reg.ListServices()
if err != nil { if err != nil {
return fmt.Errorf("failed to list services: %v", err) return fmt.Errorf("failed to list services: %v", err)
} }
// add services to routing table
for _, service := range services { for _, service := range services {
// create new micro network route
route := NewRoute( route := NewRoute(
DestAddr(service.Name), DestAddr(service.Name),
Gateway(r), Gateway(r),
Network(network), Network(network),
Metric(metric), Metric(metric),
) )
// add new route to routing table
if err := r.opts.Table.Add(route); err != nil { if err := r.opts.Table.Add(route); err != nil {
return fmt.Errorf("failed to add route for service: %s", service.Name) return fmt.Errorf("failed to add route for service: %s", service.Name)
} }
@ -161,10 +160,13 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric
return nil return nil
} }
// watch local registry // manageServiceRoutes watches services in given registry and updates the routing table accordingly.
func (r *router) watchLocal(w registry.Watcher) error { // It returns error if the service registry watcher has stopped or if the routing table failed to be updated.
func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric int) error {
defer r.wg.Done() defer r.wg.Done()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1) r.wg.Add(1)
go func() { go func() {
defer r.wg.Done() defer r.wg.Done()
@ -186,75 +188,23 @@ func (r *router) watchLocal(w registry.Watcher) error {
break break
} }
// create new route
route := NewRoute( route := NewRoute(
DestAddr(res.Service.Name), DestAddr(res.Service.Name),
Gateway(r), Gateway(r),
Network("local"), Network(network),
Metric(1), Metric(metric),
) )
switch res.Action { switch res.Action {
case "create": case "create":
if len(res.Service.Nodes) > 0 { if len(res.Service.Nodes) > 0 {
if err := r.opts.Table.Add(route); err != nil { if err := r.opts.Table.Add(route); err != nil {
log.Logf("[router] failed to add route for local service: %v", res.Service.Name) return fmt.Errorf("failed to add route for service: %v", res.Service.Name)
} }
} }
case "delete": case "delete":
if err := r.opts.Table.Remove(route); err != nil { if err := r.opts.Table.Remove(route); err != nil {
log.Logf("[router] failed to remove route for local service: %v", res.Service.Name) return fmt.Errorf("failed to remove route for service: %v", res.Service.Name)
}
}
}
return watchErr
}
// watch remote registry
func (r *router) watchRemote(w registry.Watcher) error {
defer r.wg.Done()
r.wg.Add(1)
go func() {
defer r.wg.Done()
<-r.exit
w.Stop()
}()
var watchErr error
// watch for changes to services
for {
res, err := w.Next()
if err == registry.ErrWatcherStopped {
break
}
if err != nil {
watchErr = err
break
}
// create new route
route := NewRoute(
DestAddr(res.Service.Name),
Gateway(r),
Network(r.opts.NetworkAddress),
Metric(10),
RoutePolicy(IgnoreIfExists),
)
switch res.Action {
case "create":
if len(res.Service.Nodes) > 0 {
if err := r.opts.Table.Add(route); err != nil {
log.Logf("[router] failed to add route for network service: %v", res.Service.Name)
}
}
case "delete":
if err := r.opts.Table.Remove(route); err != nil {
log.Logf("[router] failed to remove route for network service: %v", res.Service.Name)
} }
} }
} }
@ -290,7 +240,6 @@ func (r *router) watchTable(w Watcher) error {
addr := strings.Split(r.opts.Address, ":") addr := strings.Split(r.opts.Address, ":")
port, err := strconv.Atoi(addr[1]) port, err := strconv.Atoi(addr[1])
if err != nil { if err != nil {
log.Logf("[router] could not parse router address from %s: %v", r.opts.Address, err)
continue continue
} }
@ -307,14 +256,12 @@ func (r *router) watchTable(w Watcher) error {
switch res.Action { switch res.Action {
case "add": case "add":
log.Logf("[router] routing table action: %s, route: %v", res.Action, res.Route)
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil { if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil {
log.Logf("[router] failed to register service %s in network registry: %v", service.Name, err) return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err)
} }
case "remove": case "remove":
log.Logf("[router] routing table action: %s, route: %v", res.Action, res.Route)
if err := r.opts.NetworkRegistry.Register(service); err != nil { if err := r.opts.NetworkRegistry.Register(service); err != nil {
log.Logf("[router] failed to deregister service %s from network registry: %v", service.Name, err) return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err)
} }
} }
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/olekukonko/tablewriter" "github.com/olekukonko/tablewriter"
) )
// TODO: This will allow for arbitrary routing table config.
// TableOptions are routing table options // TableOptions are routing table options
type TableOptions struct{} type TableOptions struct{}

View File

@ -6,17 +6,17 @@ import (
) )
var ( var (
// DefaultAddress is default router bind address // DefaultGossipAddress is default gossip bind address
DefaultAddress = ":9093" DefaultGossipAddress = ":9093"
// DefaultNetworkAddress is default micro network bind address // DefaultNetworkAddress is default network bind address
DefaultNetworkAddress = ":9094" DefaultNetworkAddress = ":9094"
) )
// Options are router options // Options are router options
type Options struct { type Options struct {
// ID is router ID // ID is router id
ID string ID string
// Address is router address // Address is router micro service address
Address string Address string
// GossipAddress is router gossip address // GossipAddress is router gossip address
GossipAddress string GossipAddress string
@ -39,7 +39,7 @@ func ID(id string) Option {
} }
} }
// Address sets router address // Address sets router service address
func Address(a string) Option { func Address(a string) Option {
return func(o *Options) { return func(o *Options) {
o.Address = a o.Address = a
@ -94,7 +94,8 @@ func DefaultOptions() Options {
// TODO: DefaultRIB needs to be added once it's properly figured out // TODO: DefaultRIB needs to be added once it's properly figured out
return Options{ return Options{
ID: uuid.New().String(), ID: uuid.New().String(),
Address: DefaultAddress, Address: ":8083",
GossipAddress: DefaultGossipAddress,
NetworkAddress: DefaultNetworkAddress, NetworkAddress: DefaultNetworkAddress,
LocalRegistry: registry.DefaultRegistry, LocalRegistry: registry.DefaultRegistry,
NetworkRegistry: registry.DefaultRegistry, NetworkRegistry: registry.DefaultRegistry,

View File

@ -1,21 +1,22 @@
package router package router
// RIB is Routing Information Base // RIB is Routing Information Base.
// RIB is used to source the base routing table.
type RIB interface { type RIB interface {
// Initi initializes RIB // Initi initializes RIB
Init(...RIBOption) error Init(...RIBOption) error
// Options returns RIB options // Options returns RIB options
Options() RIBOptions Options() RIBOptions
// Routes returns routes in RIB // Routes returns routes
Routes() []Route Routes() []Route
// String returns debug info // String returns debug info
String() string String() string
} }
// RIBOptopn is used to configure RIB // RIBOptopn sets RIB options
type RIBOption func(*RIBOptions) type RIBOption func(*RIBOptions)
// RIBOptions allow to set RIB sources. // RIBOptions configures various RIB options
type RIBOptions struct { type RIBOptions struct {
// Source defines RIB source URL // Source defines RIB source URL
Source string Source string

View File

@ -2,6 +2,13 @@ package router
import "context" import "context"
var (
// DefaultLocalMetric is default route cost for local network
DefaultLocalMetric = 1
// DefaultNetworkMetric is default route cost for micro network
DefaultNetworkMetric = 10
)
// AddPolicy defines routing table addition policy // AddPolicy defines routing table addition policy
type AddPolicy int type AddPolicy int
@ -10,8 +17,6 @@ const (
OverrideIfExists AddPolicy = iota OverrideIfExists AddPolicy = iota
// IgnoreIfExists does not add new route // IgnoreIfExists does not add new route
IgnoreIfExists IgnoreIfExists
// ErrIfExists returns error if the route already exists
ErrIfExists
) )
// RouteOption is used to define routing table entry options // RouteOption is used to define routing table entry options

View File

@ -1,13 +1,6 @@
// Package router provides an interface for micro network router // Package router provides an interface for micro network router
package router package router
import "errors"
var (
// ErrNotImplemented is returned when some functionality has not been implemented
ErrNotImplemented = errors.New("not implemented")
)
// Router is micro network router // Router is micro network router
type Router interface { type Router interface {
// Init initializes the router with options // Init initializes the router with options