Small code refactoring. Added more comments and parseToNode func

This commit is contained in:
Milos Gajdos 2019-06-13 23:28:47 +02:00
parent 6a33b7576b
commit 322eaae529
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
7 changed files with 64 additions and 52 deletions

View File

@ -70,35 +70,30 @@ func (r *router) Network() string {
// Start starts the router
func (r *router) Start() error {
// add local service routes into the routing table
if err := r.addServiceRoutes(r.opts.LocalRegistry, "local", 1); err != nil {
if err := r.addServiceRoutes(r.opts.LocalRegistry, "local", DefaultLocalMetric); err != nil {
return fmt.Errorf("failed adding routes for local services: %v", err)
}
// add network service routes into the routing table
if err := r.addServiceRoutes(r.opts.NetworkRegistry, r.opts.NetworkAddress, 10); err != nil {
if err := r.addServiceRoutes(r.opts.NetworkRegistry, r.opts.NetworkAddress, DefaultNetworkMetric); err != nil {
return fmt.Errorf("failed adding routes for network services: %v", err)
}
// lookup local service routes and advertise them in network registry
// routing table has been bootstrapped;
// NOTE: we only need to advertise local services upstream
// lookup local service routes and advertise them upstream
query := NewQuery(QueryNetwork("local"))
localRoutes, err := r.opts.Table.Lookup(query)
if err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to lookup local service routes: %v", err)
}
addr := strings.Split(r.opts.Address, ":")
port, err := strconv.Atoi(addr[1])
node, err := r.parseToNode()
if err != nil {
fmt.Errorf("could not parse router address from %s: %v", r.opts.Address, err)
return fmt.Errorf("failed to parse router into node: %v", err)
}
for _, route := range localRoutes {
node := &registry.Node{
Id: r.opts.ID,
Address: addr[0],
Port: port,
}
service := &registry.Service{
Name: route.Options().DestAddr,
Nodes: []*registry.Node{node},
@ -108,37 +103,37 @@ func (r *router) Start() error {
}
}
lWatcher, err := r.opts.LocalRegistry.Watch()
localWatcher, err := r.opts.LocalRegistry.Watch()
if err != nil {
return fmt.Errorf("failed to create local registry watcher: %v", err)
}
rWatcher, err := r.opts.NetworkRegistry.Watch()
networkWatcher, err := r.opts.NetworkRegistry.Watch()
if err != nil {
return fmt.Errorf("failed to create network registry watcher: %v", err)
}
// we only watch local entries which we resend to network registry
tWatcher, err := r.opts.Table.Watch(WatchNetwork("local"))
// we only watch local netwrork entries which we then propagate upstream to network
tableWatcher, err := r.opts.Table.Watch(WatchNetwork("local"))
if err != nil {
return fmt.Errorf("failed to create routing table watcher: %v", err)
}
r.wg.Add(1)
go r.manageServiceRoutes(lWatcher, "local", DefaultLocalMetric)
go r.manageServiceRoutes(localWatcher, "local", DefaultLocalMetric)
r.wg.Add(1)
go r.manageServiceRoutes(rWatcher, r.opts.NetworkAddress, DefaultNetworkMetric)
go r.manageServiceRoutes(networkWatcher, r.opts.NetworkAddress, DefaultNetworkMetric)
r.wg.Add(1)
go r.watchTable(tWatcher)
go r.watchTable(tableWatcher)
return nil
}
// addServiceRouteslists all available services in given registry and adds them to the routing table.
// NOTE: this is a one-off operation done to bootstrap the rouing table of the new router when it starts.
// It returns error if the route could not be added to the routing table.
// It returns error if any of the routes could not be added to the routing table.
func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error {
services, err := reg.ListServices()
if err != nil {
@ -153,13 +148,34 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric
Metric(metric),
)
if err := r.opts.Table.Add(route); err != nil {
return fmt.Errorf("failed to add route for service: %s", service.Name)
return fmt.Errorf("error adding route for service: %s", service.Name)
}
}
return nil
}
// parseToNode parses router address into registryNode.
// It retuns error if the router network address could not be parsed into service host and port.
// NOTE: We use ":" as a default delimiter we split the network address on and then attempt to parse port into int.
func (r *router) parseToNode() (*registry.Node, error) {
// split on ":" as a standard host:port delimiter
addr := strings.Split(r.opts.NetworkAddress, ":")
// try to parse network port into integer
port, err := strconv.Atoi(addr[1])
if err != nil {
return nil, fmt.Errorf("could not parse router network address from %s: %v", r.opts.NetworkAddress, err)
}
node := &registry.Node{
Id: r.opts.ID,
Address: addr[0],
Port: port,
}
return node, nil
}
// manageServiceRoutes watches services in given registry and updates the routing table accordingly.
// It returns error if the service registry watcher has stopped or if the routing table failed to be updated.
func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric int) error {
@ -237,16 +253,9 @@ func (r *router) watchTable(w Watcher) error {
break
}
addr := strings.Split(r.opts.Address, ":")
port, err := strconv.Atoi(addr[1])
node, err := r.parseToNode()
if err != nil {
continue
}
node := &registry.Node{
Id: r.opts.ID,
Address: addr[0],
Port: port,
return fmt.Errorf("failed to parse router into node: %v", err)
}
service := &registry.Service{

View File

@ -11,8 +11,8 @@ import (
"github.com/olekukonko/tablewriter"
)
// TODO: This will allow for arbitrary routing table config.
// TableOptions are routing table options
// TODO: This will allow for arbitrary routing table options in the future
type TableOptions struct{}
// table is in memory routing table
@ -28,7 +28,7 @@ type table struct {
sync.RWMutex
}
// newTable creates default routing table and returns it
// newTable creates in memory routing table and returns it
func newTable(opts ...TableOption) Table {
// default options
var options TableOptions
@ -64,12 +64,13 @@ func (t *table) Options() TableOptions {
// Add adds a route to the routing table
func (t *table) Add(r Route) error {
t.Lock()
defer t.Unlock()
destAddr := r.Options().DestAddr
sum := t.hash(r)
t.Lock()
defer t.Unlock()
if _, ok := t.m[destAddr]; !ok {
t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r
@ -110,12 +111,12 @@ func (t *table) Remove(r Route) error {
// Update updates routing table with new route
func (t *table) Update(r Route) error {
t.Lock()
defer t.Unlock()
destAddr := r.Options().DestAddr
sum := t.hash(r)
t.Lock()
defer t.Unlock()
if _, ok := t.m[destAddr]; !ok {
return ErrRouteNotFound
}

View File

@ -25,6 +25,7 @@ type Options struct {
// LocalRegistry is router local registry
LocalRegistry registry.Registry
// NetworkRegistry is router remote registry
// NOTE: we need some abstraction on top of gossip.Registry
NetworkRegistry registry.Registry
// Table is routing table
Table Table
@ -81,7 +82,7 @@ func NetworkRegistry(r registry.Registry) Option {
}
}
// RouterIB allows to configure RIB
// RouterRIB allows to configure RIB
func RouterRIB(r RIB) Option {
return func(o *Options) {
o.RIB = r

View File

@ -10,16 +10,16 @@ const (
ClosestMatch
)
// QueryOption is used to define query options
// QueryOption sets routing table query options
type QueryOption func(*QueryOptions)
// QueryOptions allow to define routing table query options
// QueryOptions are routing table query options
type QueryOptions struct {
// DestAddr defines destination address
// DestAddr is destination address
DestAddr string
// NetworkAddress defines network address
// NetworkAddress is network address
Network string
// Policy defines query lookup policy
// Policy is query lookup policy
Policy LookupPolicy
}
@ -37,19 +37,20 @@ func QueryNetwork(a string) QueryOption {
}
}
// QueryPolicy allows to define query lookup policy
// QueryPolicy sets query policy
func QueryPolicy(p LookupPolicy) QueryOption {
return func(o *QueryOptions) {
o.Policy = p
}
}
// Query defines routing table query
// Query is routing table query
type Query interface {
// Options returns query options
Options() QueryOptions
}
// query is a basic implementation of Query
type query struct {
opts QueryOptions
}

View File

@ -13,10 +13,10 @@ type RIB interface {
String() string
}
// RIBOptopn sets RIB options
// RIBOption sets RIB options
type RIBOption func(*RIBOptions)
// RIBOptions configures various RIB options
// RIBOptions are RIB options
type RIBOptions struct {
// Source defines RIB source URL
Source string

View File

@ -13,16 +13,16 @@ var (
type AddPolicy int
const (
// Override overrides existing routing table route
// OverrideIfExists overrides route if it already exists
OverrideIfExists AddPolicy = iota
// IgnoreIfExists does not add new route
// IgnoreIfExists does not modify existing route
IgnoreIfExists
)
// RouteOption is used to define routing table entry options
// RouteOption is used to set routing table entry options
type RouteOption func(*RouteOptions)
// RouteOptions defines micro network routing table route options
// RouteOptions are route options
type RouteOptions struct {
// DestAddr is destination address
DestAddr string

View File

@ -29,7 +29,7 @@ type Result struct {
Route Route
}
// Watcher options
// WatchOptions are table watcher options
type WatchOptions struct {
// Specify destination address to watch
DestAddr string