From fe84a2d726fe4f2f4ebc11926ca7500eaec58770 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 26 Jun 2019 16:03:19 +0100 Subject: [PATCH] Route per service node. No Network Registry for now. --- network/router/default_router.go | 191 +++++++------------------------ network/router/default_table.go | 20 ++-- network/router/options.go | 31 ++--- network/router/query.go | 50 ++++++-- network/router/route.go | 13 ++- network/router/table_watcher.go | 27 ++--- 6 files changed, 110 insertions(+), 222 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index d600d1da..a84a04ee 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -2,25 +2,14 @@ package router import ( "fmt" - "net" - "strconv" "strings" "sync" - "time" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" ) -var ( - // AdvertiseTick defines how often in seconds do we scal the local registry - // to advertise the local services to the network registry - AdvertiseTick = 5 * time.Second - // AdvertiseTTL defines network registry TTL in seconds - // NOTE: this is a rather arbitrary picked value subject to change - AdvertiseTTL = 120 * time.Second -) - +// router provides default router implementation type router struct { opts Options exit chan struct{} @@ -29,6 +18,9 @@ type router struct { // newRouter creates new router and returns it func newRouter(opts ...Option) Router { + // TODO: we need to add default GW entry here + // Should default GW be part of router options? + // get default options options := DefaultOptions() @@ -74,41 +66,24 @@ func (r *router) Address() string { // Network returns the address router advertises to the network func (r *router) Network() string { - return r.opts.Advertise + return r.opts.Network } -// Advertise advertises the router routes to the network. -// Advertise is a blocking function. It launches multiple goroutines that watch -// service registries and advertise the router routes to other routers in the network. +// Advertise advertises the routes to the network. It is a blocking function. // It returns error if any of the launched goroutines fail with error. func (r *router) Advertise() error { // add local service routes into the routing table - if err := r.addServiceRoutes(r.opts.Registry, DefaultLocalMetric); err != nil { - return fmt.Errorf("failed adding routes for local services: %v", err) - } - - // add network service routes into the routing table - if err := r.addServiceRoutes(r.opts.Network, DefaultNetworkMetric); err != nil { - return fmt.Errorf("failed adding routes for network services: %v", err) - } - - node, err := r.parseToNode() - if err != nil { - return fmt.Errorf("failed to parse router into service node: %v", err) + if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { + return fmt.Errorf("failed adding routes: %v", err) } localWatcher, err := r.opts.Registry.Watch() if err != nil { - return fmt.Errorf("failed to create local registry watcher: %v", err) - } - - networkWatcher, err := r.opts.Network.Watch() - if err != nil { - return fmt.Errorf("failed to create network registry watcher: %v", err) + return fmt.Errorf("failed to create registry watcher: %v", err) } // error channel collecting goroutine errors - errChan := make(chan error, 3) + errChan := make(chan error, 1) r.wg.Add(1) go func() { @@ -117,102 +92,42 @@ func (r *router) Advertise() error { errChan <- r.manageServiceRoutes(localWatcher, DefaultLocalMetric) }() - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch network registry and register routes in routine table - errChan <- r.manageServiceRoutes(networkWatcher, DefaultNetworkMetric) - }() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and advertise local service to the network - errChan <- r.advertiseToNetwork(node) - }() - return <-errChan } // addServiceRoutes adds all services in given registry to the routing table. -// NOTE: this is a one-off operation done when bootstrapping the routing table of the new router. -// It returns error if either the services could not be listed or if the routes could not be added to the routing table. -func (r *router) addServiceRoutes(reg registry.Registry, metric int) error { +// NOTE: this is a one-off operation done when bootstrapping the routing table +// It returns error if either the services failed to be listed or +// if the routes could not be added to the routing table. +func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { return fmt.Errorf("failed to list services: %v", err) } + // add each service node as a separate route; for _, service := range services { - route := Route{ - Destination: service.Name, - Router: r, - Network: r.opts.Advertise, - Metric: metric, - } - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("error adding route for service: %s", service.Name) + for _, node := range service.Nodes { + var gw string + if node.Port > 0 { + gw = fmt.Sprintf("%s:%d", node.Address, node.Port) + } + route := Route{ + Destination: service.Name, + Gateway: gw, + Router: r.opts.Address, + Network: r.opts.Network, + Metric: metric, + } + if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { + return fmt.Errorf("error adding route for service %s: %s", service.Name, err) + } } } return nil } -// parseToNode parses router into registry.Node and returns the result. -// It returns error if the router network address could not be parsed into host and port. -func (r *router) parseToNode() (*registry.Node, error) { - // split router address to host and port part - addr, portStr, err := net.SplitHostPort(r.opts.Advertise) - if err != nil { - return nil, fmt.Errorf("could not parse router address: %v", err) - } - - // try to parse network port into integer - port, err := strconv.Atoi(portStr) - if err != nil { - return nil, fmt.Errorf("could not parse router network address: %v", err) - } - - node := ®istry.Node{ - Id: r.opts.ID, - Address: addr, - Port: port, - } - - return node, nil -} - -// advertiseToNetwork periodically scans local registry and registers (i.e. advertises) all the local services in the network registry. -// It returns error if either the local services failed to be listed or if it fails to register local service in network registry. -func (r *router) advertiseToNetwork(node *registry.Node) error { - // ticker to periodically scan the local registry - ticker := time.NewTicker(AdvertiseTick) - - for { - select { - case <-r.exit: - return nil - case <-ticker.C: - // list all local services - services, err := r.opts.Registry.ListServices() - if err != nil { - return fmt.Errorf("failed to list local services: %v", err) - } - // loop through all registered local services and register them in the network registry - for _, service := range services { - svc := ®istry.Service{ - Name: service.Name, - Nodes: []*registry.Node{node}, - } - // register the local service in the network registry - if err := r.opts.Network.Register(svc, registry.RegisterTTL(AdvertiseTTL)); err != nil { - return fmt.Errorf("failed to register service %s in network registry: %v", svc.Name, err) - } - } - } - } -} - // manageServiceRoutes watches services in given registry and updates the routing table accordingly. // It returns error if the service registry watcher has stopped or if the routing table failed to be updated. func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { @@ -240,25 +155,21 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { route := Route{ Destination: res.Service.Name, - Router: r, - Network: r.opts.Advertise, + Router: r.opts.Address, + Network: r.opts.Network, Metric: metric, } switch res.Action { case "create": - if len(res.Service.Nodes) > 0 { - // only return error if the route is not duplicate, but something else has failed - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("failed to add route for service: %v", res.Service.Name) - } + // only return error if the route is not duplicate, but something else has failed + if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { + return fmt.Errorf("failed to add route for service %v: %s", res.Service.Name, err) } case "delete": - if len(res.Service.Nodes) < 1 { - // only return error if the route is present in the table, but something else has failed - if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed to delete route for service: %v", res.Service.Name) - } + // only return error if the route is not in the table, but something else has failed + if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { + return fmt.Errorf("failed to delete route for service %v: %s", res.Service.Name, err) } } } @@ -274,30 +185,6 @@ func (r *router) Stop() error { // wait for all goroutines to finish r.wg.Wait() - // NOTE: we need a more efficient way of doing this e.g. network routes - // should ideally be autodeleted when the router stops gossiping - query := NewQuery(QueryRouter(r), QueryNetwork(r.opts.Advertise)) - routes, err := r.opts.Table.Lookup(query) - if err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed to lookup routes for router %s: %v", r.opts.ID, err) - } - - // parse router to registry.Node - node, err := r.parseToNode() - if err != nil { - return fmt.Errorf("failed to parse router into service node: %v", err) - } - - for _, route := range routes { - service := ®istry.Service{ - Name: route.Destination, - Nodes: []*registry.Node{node}, - } - if err := r.opts.Network.Deregister(service); err != nil { - return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err) - } - } - return nil } @@ -311,7 +198,7 @@ func (r *router) String() string { data := []string{ r.opts.ID, r.opts.Address, - r.opts.Advertise, + r.opts.Network, fmt.Sprintf("%d", r.opts.Table.Size()), } table.Append(data) diff --git a/network/router/default_table.go b/network/router/default_table.go index 04e8f6db..196ca4bf 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -171,7 +171,7 @@ func (t *table) Lookup(q Query) ([]Route, error) { } for _, route := range routes { if q.Options().Network == "*" || q.Options().Network == route.Network { - if q.Options().Router.ID() == "*" || q.Options().Router.ID() == route.Router.ID() { + if q.Options().Router == "*" { if route.Metric <= q.Options().Metric { results = append(results, route) } @@ -182,8 +182,8 @@ func (t *table) Lookup(q Query) ([]Route, error) { if q.Options().Destination == "*" { for _, route := range routes { - if q.Options().Network == "*" || q.Options().Network == route.Router.Network() { - if q.Options().Router.ID() == "*" || q.Options().Router.ID() == route.Router.ID() { + if q.Options().Network == "*" || q.Options().Network == route.Network { + if q.Options().Router == "*" { if route.Metric <= q.Options().Metric { results = append(results, route) } @@ -193,7 +193,7 @@ func (t *table) Lookup(q Query) ([]Route, error) { } } - if len(results) == 0 && q.Options().Policy != DiscardNoRoute { + if len(results) == 0 && q.Options().Policy != DiscardIfNone { return nil, ErrRouteNotFound } @@ -205,7 +205,6 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) { // by default watch everything wopts := WatchOptions{ Destination: "*", - Network: "*", } for _, o := range opts { @@ -256,13 +255,14 @@ func (t *table) String() string { // create nice table printing structure table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Destination", "Router", "Network", "Metric"}) + table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"}) for _, destRoute := range t.m { for _, route := range destRoute { strRoute := []string{ route.Destination, - route.Router.Address(), + route.Gateway, + route.Router, route.Network, fmt.Sprintf("%d", route.Metric), } @@ -278,12 +278,8 @@ func (t *table) String() string { // hash hashes the route using router gateway and network address func (t *table) hash(r Route) uint64 { - destAddr := r.Destination - routerAddr := r.Router.Address() - netAddr := r.Network - t.h.Reset() - t.h.Write([]byte(destAddr + routerAddr + netAddr)) + t.h.Write([]byte(r.Destination + r.Gateway + r.Router + r.Network)) return t.h.Sum64() } diff --git a/network/router/options.go b/network/router/options.go index ee0c6526..3ede4776 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -8,8 +8,6 @@ import ( var ( // DefaultAddress is default router address DefaultAddress = ":9093" - // DefaultAdvertise is default address advertised to the network - DefaultAdvertise = ":9094" ) // Options are router options @@ -18,12 +16,10 @@ type Options struct { ID string // Address is router address Address string - // Advertise is the address advertised to the network - Advertise string + // Network is micro network + Network string // Registry is the local registry Registry registry.Registry - // Networkis the network registry - Network registry.Registry // Table is routing table Table Table } @@ -42,10 +38,10 @@ func Address(a string) Option { } } -// Advertise sets the address that is advertise to the network -func Advertise(n string) Option { +// Network sets router network +func Network(n string) Option { return func(o *Options) { - o.Advertise = n + o.Network = n } } @@ -63,22 +59,13 @@ func Registry(r registry.Registry) Option { } } -// Network sets the network registry -func Network(r registry.Registry) Option { - return func(o *Options) { - o.Network = r - } -} - // DefaultOptions returns router default options func DefaultOptions() Options { // NOTE: by default both local and network registies use default registry i.e. mdns return Options{ - ID: uuid.New().String(), - Address: DefaultAddress, - Advertise: DefaultAdvertise, - Registry: registry.DefaultRegistry, - Network: registry.DefaultRegistry, - Table: NewTable(), + ID: uuid.New().String(), + Address: DefaultAddress, + Registry: registry.DefaultRegistry, + Table: NewTable(), } } diff --git a/network/router/query.go b/network/router/query.go index 7c18e075..c479f9db 100644 --- a/network/router/query.go +++ b/network/router/query.go @@ -1,11 +1,18 @@ package router +import ( + "fmt" + "strings" + + "github.com/olekukonko/tablewriter" +) + // LookupPolicy defines query policy type LookupPolicy int const ( - // DiscardNoRoute discards query when no route is found - DiscardNoRoute LookupPolicy = iota + // DiscardIfNone discards query when no route is found + DiscardIfNone LookupPolicy = iota // ClosestMatch returns closest match to supplied query ClosestMatch ) @@ -13,7 +20,7 @@ const ( // String returns human representation of LookupPolicy func (lp LookupPolicy) String() string { switch lp { - case DiscardNoRoute: + case DiscardIfNone: return "DISCARD" case ClosestMatch: return "CLOSEST" @@ -29,10 +36,10 @@ type QueryOption func(*QueryOptions) type QueryOptions struct { // Destination is destination address Destination string + // Router is router address + Router string // Network is network address Network string - // Router is gateway address - Router Router // Metric is route metric Metric int // Policy is query lookup policy @@ -54,7 +61,7 @@ func QueryNetwork(a string) QueryOption { } // QueryRouter sets query gateway address -func QueryRouter(r Router) QueryOption { +func QueryRouter(r string) QueryOption { return func(o *QueryOptions) { o.Router = r } @@ -88,17 +95,14 @@ type query struct { // NewQuery creates new query and returns it func NewQuery(opts ...QueryOption) Query { - // default gateway for wildcard router - r := newRouter(ID("*")) - // default options // NOTE: by default we use DefaultNetworkMetric qopts := QueryOptions{ Destination: "*", + Router: "*", Network: "*", - Router: r, Metric: DefaultNetworkMetric, - Policy: DiscardNoRoute, + Policy: DiscardIfNone, } for _, o := range opts { @@ -114,3 +118,27 @@ func NewQuery(opts ...QueryOption) Query { func (q *query) Options() QueryOptions { return q.opts } + +// String prints routing table query in human readable form +func (q query) String() string { + // this will help us build routing table string + sb := &strings.Builder{} + + // create nice table printing structure + table := tablewriter.NewWriter(sb) + table.SetHeader([]string{"Destination", "Router", "Network", "Metric", "Policy"}) + + strQuery := []string{ + q.opts.Destination, + q.opts.Router, + q.opts.Network, + fmt.Sprintf("%d", q.opts.Metric), + fmt.Sprintf("%s", q.opts.Policy), + } + table.Append(strQuery) + + // render table into sb + table.Render() + + return sb.String() +} diff --git a/network/router/route.go b/network/router/route.go index 88fe5b18..d52c2688 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -20,7 +20,7 @@ type RoutePolicy int const ( // OverrideIfExists overrides route if it already exists OverrideIfExists RoutePolicy = iota - // IgnoreIfExists does not modify existing route + // IgnoreIfExists instructs to not modify existing route IgnoreIfExists ) @@ -40,8 +40,10 @@ func (p RoutePolicy) String() string { type Route struct { // Destination is destination address Destination string - // Router is the network router - Router Router + // Gateway is route gateway + Gateway string + // Router is the network router address + Router string // Network is micro network address Network string // Metric is the route cost metric @@ -57,11 +59,12 @@ func (r *Route) String() string { // create nice table printing structure table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Destination", "Router", "Network", "Metric"}) + table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"}) strRoute := []string{ r.Destination, - r.Router.Address(), + r.Gateway, + r.Router, r.Network, fmt.Sprintf("%d", r.Metric), } diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 17a0971f..97d1f1b7 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -64,22 +64,13 @@ type Watcher interface { type WatchOptions struct { // Specify destination address to watch Destination string - // Specify network to watch - Network string } // WatchDestination sets what destination to watch // Destination is usually microservice name -func WatchDestination(a string) WatchOption { +func WatchDestination(d string) WatchOption { return func(o *WatchOptions) { - o.Destination = a - } -} - -// WatchNetwork sets what network to watch -func WatchNetwork(n string) WatchOption { - return func(o *WatchOptions) { - o.Network = n + o.Destination = d } } @@ -90,19 +81,16 @@ type tableWatcher struct { } // Next returns the next noticed action taken on table -// TODO: this needs to be thought through properly -// we are aiming to provide the same options Query provides +// TODO: this needs to be thought through properly; we only allow watching particular route destination func (w *tableWatcher) Next() (*Event, error) { for { select { case res := <-w.resChan: switch w.opts.Destination { case "*", "": - if w.opts.Network == "*" || w.opts.Network == res.Route.Network { - return res, nil - } - case res.Route.Destination: - if w.opts.Network == "*" || w.opts.Network == res.Route.Network { + return res, nil + default: + if w.opts.Destination == res.Route.Destination { return res, nil } } @@ -132,11 +120,10 @@ func (w *tableWatcher) String() string { sb := &strings.Builder{} table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Destination", "Network"}) + table.SetHeader([]string{"Destination"}) data := []string{ w.opts.Destination, - w.opts.Network, } table.Append(data)