From 95fc625e998894afe4f91f1459667f3168cdfda9 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 12 Jun 2019 22:30:42 +0100 Subject: [PATCH] Big refactor. New Registry watchers. New options. New names. --- router/default.go | 218 +++++++++++++++++++++++++++++++++------- router/entry.go | 6 ++ router/options.go | 84 +++++++++++----- router/query.go | 35 ++++--- router/rib.go | 26 +++++ router/router.go | 19 ++-- router/table.go | 44 ++++++-- router/table_watcher.go | 4 +- 8 files changed, 346 insertions(+), 90 deletions(-) create mode 100644 router/rib.go diff --git a/router/default.go b/router/default.go index 5d2ffc29..7ae8a718 100644 --- a/router/default.go +++ b/router/default.go @@ -2,41 +2,33 @@ package router import ( "fmt" + "strconv" "strings" "sync" + "time" "github.com/micro/go-log" "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/registry/gossip" "github.com/olekukonko/tablewriter" ) type router struct { opts Options - goss registry.Registry exit chan struct{} wg *sync.WaitGroup } func newRouter(opts ...Option) Router { - // set default options - options := Options{ - Table: NewTable(), - } + // get default options + options := DefaultOptions() // apply requested options for _, o := range opts { o(&options) } - // bind to gossip address to join gossip registry - goss := gossip.NewRegistry( - gossip.Address(options.GossipAddr), - ) - return &router{ opts: options, - goss: goss, exit: make(chan struct{}), wg: &sync.WaitGroup{}, } @@ -65,29 +57,78 @@ func (r *router) Address() string { return r.opts.Address } +// Gossip returns gossip bind address +func (r *router) Gossip() string { + return r.opts.GossipAddress +} + // Network returns router's micro network func (r *router) Network() string { - return r.opts.NetworkAddr + return r.opts.NetworkAddress } // Start starts the router func (r *router) Start() error { - // TODO: - // - list all remote services and populate routing table - // - list all local services and populate remote registry - - gWatcher, err := r.goss.Watch() - if err != nil { - return fmt.Errorf("failed to create router gossip registry watcher: %v", err) + // add local service routes into routing table + if err := r.addServiceRoutes(r.opts.LocalRegistry, "local", 1); err != nil { + return fmt.Errorf("failed to add service routes for local services: %v", err) } - tWatcher, err := r.opts.Table.Watch() + // add network service routes into routing table + if err := r.addServiceRoutes(r.opts.NetworkRegistry, r.opts.NetworkAddress, 10); err != nil { + return fmt.Errorf("failed to add service routes for network services: %v", err) + } + + // lookup local service routes and advertise them in network registry + query := NewQuery(QueryNetwork("local")) + localRoutes, err := r.opts.Table.Lookup(query) + if err != nil && err != ErrRouteNotFound { + return fmt.Errorf("failed to lookup local service routes: %v", err) + } + + addr := strings.Split(r.opts.Address, ":") + port, err := strconv.Atoi(addr[1]) + if err != nil { + fmt.Errorf("could not parse router address from %s: %v", r.opts.Address, err) + } + + for _, route := range localRoutes { + node := ®istry.Node{ + Id: r.opts.ID, + Address: addr[0], + Port: port, + } + + service := ®istry.Service{ + Name: route.Options().DestAddr, + Nodes: []*registry.Node{node}, + } + if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil { + return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) + } + } + + lWatcher, err := r.opts.LocalRegistry.Watch() + if err != nil { + return fmt.Errorf("failed to create local registry watcher: %v", err) + } + + rWatcher, err := r.opts.NetworkRegistry.Watch() + if err != nil { + return fmt.Errorf("failed to create network registry watcher: %v", err) + } + + // we only watch local entries which we resend to network registry + tWatcher, err := r.opts.Table.Watch(WatchNetwork("local")) if err != nil { return fmt.Errorf("failed to create routing table watcher: %v", err) } r.wg.Add(1) - go r.watchGossip(gWatcher) + go r.watchLocal(lWatcher) + + r.wg.Add(1) + go r.watchRemote(rWatcher) r.wg.Add(1) go r.watchTable(tWatcher) @@ -95,15 +136,39 @@ func (r *router) Start() error { return nil } -// watch gossip registry -func (r *router) watchGossip(w registry.Watcher) error { +func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { + // list all local services + services, err := reg.ListServices() + if err != nil { + return fmt.Errorf("failed to list services: %v", err) + } + + // add services to routing table + for _, service := range services { + // create new micro network route + route := NewRoute( + DestAddr(service.Name), + Gateway(r), + Network(network), + Metric(metric), + ) + // add new route to routing table + if err := r.opts.Table.Add(route); err != nil { + return fmt.Errorf("failed to add route for service: %s", service.Name) + } + } + + return nil +} + +// watch local registry +func (r *router) watchLocal(w registry.Watcher) error { defer r.wg.Done() r.wg.Add(1) go func() { defer r.wg.Done() <-r.exit - // stop gossip registry watcher w.Stop() }() @@ -121,20 +186,83 @@ func (r *router) watchGossip(w registry.Watcher) error { break } + // create new route + route := NewRoute( + DestAddr(res.Service.Name), + Gateway(r), + Network("local"), + Metric(1), + ) + switch res.Action { case "create": if len(res.Service.Nodes) > 0 { - log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) + if err := r.opts.Table.Add(route); err != nil { + log.Logf("[router] failed to add route for local service: %v", res.Service.Name) + } } case "delete": - log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) + if err := r.opts.Table.Remove(route); err != nil { + log.Logf("[router] failed to remove route for local service: %v", res.Service.Name) + } } } return watchErr } -// watch gossip registry +// watch remote registry +func (r *router) watchRemote(w registry.Watcher) error { + defer r.wg.Done() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + <-r.exit + w.Stop() + }() + + var watchErr error + + // watch for changes to services + for { + res, err := w.Next() + if err == registry.ErrWatcherStopped { + break + } + + if err != nil { + watchErr = err + break + } + + // create new route + route := NewRoute( + DestAddr(res.Service.Name), + Gateway(r), + Network(r.opts.NetworkAddress), + Metric(10), + RoutePolicy(IgnoreIfExists), + ) + + switch res.Action { + case "create": + if len(res.Service.Nodes) > 0 { + if err := r.opts.Table.Add(route); err != nil { + log.Logf("[router] failed to add route for network service: %v", res.Service.Name) + } + } + case "delete": + if err := r.opts.Table.Remove(route); err != nil { + log.Logf("[router] failed to remove route for network service: %v", res.Service.Name) + } + } + } + + return watchErr +} + +// watch routing table changes func (r *router) watchTable(w Watcher) error { defer r.wg.Done() @@ -142,7 +270,6 @@ func (r *router) watchTable(w Watcher) error { go func() { defer r.wg.Done() <-r.exit - // stop gossip registry watcher w.Stop() }() @@ -160,11 +287,35 @@ func (r *router) watchTable(w Watcher) error { break } + addr := strings.Split(r.opts.Address, ":") + port, err := strconv.Atoi(addr[1]) + if err != nil { + log.Logf("[router] could not parse router address from %s: %v", r.opts.Address, err) + continue + } + + node := ®istry.Node{ + Id: r.opts.ID, + Address: addr[0], + Port: port, + } + + service := ®istry.Service{ + Name: res.Route.Options().DestAddr, + Nodes: []*registry.Node{node}, + } + switch res.Action { case "add": - log.Logf("Action: %s, Route: %v", res.Action, res.Route) + log.Logf("[router] routing table action: %s, route: %v", res.Action, res.Route) + if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil { + log.Logf("[router] failed to register service %s in network registry: %v", service.Name, err) + } case "remove": - log.Logf("Action: %s, Route: %v", res.Action, res.Route) + log.Logf("[router] routing table action: %s, route: %v", res.Action, res.Route) + if err := r.opts.NetworkRegistry.Register(service); err != nil { + log.Logf("[router] failed to deregister service %s from network registry: %v", service.Name, err) + } } } @@ -187,13 +338,12 @@ func (r *router) String() string { sb := &strings.Builder{} table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"ID", "Address", "Gossip", "Network", "Table"}) + table.SetHeader([]string{"ID", "Address", "Network", "Table"}) data := []string{ r.opts.ID, r.opts.Address, - r.opts.GossipAddr, - r.opts.NetworkAddr, + r.opts.NetworkAddress, fmt.Sprintf("%d", r.opts.Table.Size()), } table.Append(data) diff --git a/router/entry.go b/router/entry.go index d446a3d9..3b94a562 100644 --- a/router/entry.go +++ b/router/entry.go @@ -1,11 +1,15 @@ package router +import "context" + // AddPolicy defines routing table addition policy type AddPolicy int const ( // Override overrides existing routing table route OverrideIfExists AddPolicy = iota + // IgnoreIfExists does not add new route + IgnoreIfExists // ErrIfExists returns error if the route already exists ErrIfExists ) @@ -22,6 +26,8 @@ type RouteOptions struct { Metric int // Policy defines route addition policy Policy AddPolicy + // Context stores other arbitrary options + Context context.Context } // DestAddr sets destination address diff --git a/router/options.go b/router/options.go index f8c34091..9774f237 100644 --- a/router/options.go +++ b/router/options.go @@ -1,25 +1,35 @@ package router import ( - "context" + "github.com/google/uuid" + "github.com/micro/go-micro/registry" ) -// Options allows to set Router options +var ( + // DefaultAddress is default router bind address + DefaultAddress = ":9093" + // DefaultNetworkAddress is default micro network bind address + DefaultNetworkAddress = ":9094" +) + +// Options allows to set router options type Options struct { // ID is router ID ID string // Address is router address Address string - // GossipAddr is router gossip address - GossipAddr string - // NetworkAddr defines micro network address - NetworkAddr string - // RIB is Routing Information Base - RIB RIB + // GossipAddress is router gossip address + GossipAddress string + // NetworkAddress is micro network address + NetworkAddress string + // LocalRegistry is router local registry + LocalRegistry registry.Registry + // NetworkRegistry is router remote registry + NetworkRegistry registry.Registry // Table is routing table Table Table - // Context stores arbitrary options - Context context.Context + // RIB is Routing Information Base + RIB RIB } // ID sets Router ID @@ -36,24 +46,17 @@ func Address(a string) Option { } } -// GossipAddr sets router gossip address -func GossipAddr(a string) Option { +// GossipAddress sets router gossip address +func GossipAddress(a string) Option { return func(o *Options) { - o.GossipAddr = a + o.GossipAddress = a } } -// NetworkAddr sets router network address -func NetworkAddr(n string) Option { +// NetworkAddress sets router network address +func NetworkAddress(n string) Option { return func(o *Options) { - o.NetworkAddr = n - } -} - -// RIBase allows to configure RIB -func RIBase(r RIB) Option { - return func(o *Options) { - o.RIB = r + o.NetworkAddress = n } } @@ -63,3 +66,38 @@ func RoutingTable(t Table) Option { o.Table = t } } + +// LocalRegistry allows to specify local registry +func LocalRegistry(r registry.Registry) Option { + return func(o *Options) { + o.LocalRegistry = r + } +} + +// NetworkRegistry allows to specify remote registry +func NetworkRegistry(r registry.Registry) Option { + return func(o *Options) { + o.NetworkRegistry = r + } +} + +// RouterIB allows to configure RIB +func RouterIB(r RIB) Option { + return func(o *Options) { + o.RIB = r + } +} + +// DefaultOptions returns router default options +func DefaultOptions() Options { + // NOTE: by default both local and network registies use default registry i.e. mdns + // TODO: DefaultRIB needs to be added once it's properly figured out + return Options{ + ID: uuid.New().String(), + Address: DefaultAddress, + NetworkAddress: DefaultNetworkAddress, + LocalRegistry: registry.DefaultRegistry, + NetworkRegistry: registry.DefaultRegistry, + Table: NewTable(), + } +} diff --git a/router/query.go b/router/query.go index a022eda2..2cc0bf24 100644 --- a/router/query.go +++ b/router/query.go @@ -12,18 +12,25 @@ const ( // QueryOptions allow to define routing table query options type QueryOptions struct { - // Route allows to set route options - RouteOptions *RouteOptions + // DestAddr defines destination address + DestAddr string + // NetworkAddress defines network address + Network string // Policy defines query lookup policy Policy LookupPolicy - // Count defines max number of results to return - Count int } -// QueryRouteOpts allows to set the route query options -func QueryRouteOptons(r *RouteOptions) QueryOption { +// QueryDestAddr sets query destination address +func QueryDestAddr(a string) QueryOption { return func(o *QueryOptions) { - o.RouteOptions = r + o.DestAddr = a + } +} + +// QueryNetwork sets query network address +func QueryNetwork(a string) QueryOption { + return func(o *QueryOptions) { + o.Network = a } } @@ -34,13 +41,6 @@ func QueryPolicy(p LookupPolicy) QueryOption { } } -// QueryCount allows to set max results to return -func QueryCount(c int) QueryOption { - return func(o *QueryOptions) { - o.Count = c - } -} - // Query defines routing table query type Query interface { // Options returns query options @@ -53,7 +53,12 @@ type query struct { // NewQuery creates new query and returns it func NewQuery(opts ...QueryOption) Query { - qopts := QueryOptions{} + // default options + qopts := QueryOptions{ + DestAddr: "*", + Network: "*", + Policy: DiscardNoRoute, + } for _, o := range opts { o(&qopts) diff --git a/router/rib.go b/router/rib.go new file mode 100644 index 00000000..1c6f538e --- /dev/null +++ b/router/rib.go @@ -0,0 +1,26 @@ +package router + +// RIB is Routing Information Base +type RIB interface { + // Initi initializes RIB + Init(...RIBOption) error + // Options returns RIB options + Options() RIBOptions + // Routes returns routes in RIB + Routes() []Route + // String returns debug info + String() string +} + +// RIBOptions allow to set RIB sources. +type RIBOptions struct { + // Source defines RIB source URL + Source string +} + +// Source sets RIB source +func Source(s string) RIBOption { + return func(o *RIBOptions) { + o.Source = s + } +} diff --git a/router/router.go b/router/router.go index 40b49bc5..419b62d3 100644 --- a/router/router.go +++ b/router/router.go @@ -3,26 +3,22 @@ package router // Router is micro network router type Router interface { - // Initi initializes Router with options + // Init initializes the router with options Init(...Option) error - // Options returns Router options + // Options returns the router options Options() Options // Table returns routing table Table() Table // Address returns router adddress Address() string + // Gossip returns router gossip address + Gossip() string // Network returns router network address Network() string - // Start starts router + // Start starts the router Start() error - // Stop stops router + // Stop stops the router Stop() error - // String returns router debug info - String() string -} - -// RIB is Routing Information Base -type RIB interface { // String returns debug info String() string } @@ -30,6 +26,9 @@ type RIB interface { // Option used by the router type Option func(*Options) +// RIBOptopn is used to configure RIB +type RIBOption func(*RIBOptions) + // RouteOption is used to define routing table entry options type RouteOption func(*RouteOptions) diff --git a/router/table.go b/router/table.go index 9611bf65..d246e177 100644 --- a/router/table.go +++ b/router/table.go @@ -28,7 +28,7 @@ type Table interface { // Remove removes existing route from the table Remove(Route) error // Update updates route in the table - Update(...RouteOption) error + Update(Route) error // Lookup looks up routes in the table Lookup(Query) ([]Route, error) // Watch returns a watcher which allows you to track updates to the table @@ -83,6 +83,10 @@ func (t *table) Add(r Route) error { return nil } + if r.Options().Policy == IgnoreIfExists { + return nil + } + return ErrDuplicateRoute } @@ -104,13 +108,11 @@ func (t *table) Remove(r Route) error { return nil } -// Update updates routing table using propvided options -func (t *table) Update(opts ...RouteOption) error { +// Update updates routing table with new route +func (t *table) Update(r Route) error { t.Lock() defer t.Unlock() - r := NewRoute(opts...) - destAddr := r.Options().DestAddr sum := t.hash(r) @@ -129,7 +131,37 @@ func (t *table) Update(opts ...RouteOption) error { // Lookup queries routing table and returns all routes that match it func (t *table) Lookup(q Query) ([]Route, error) { - return nil, ErrNotImplemented + t.RLock() + defer t.RUnlock() + + var results []Route + + for destAddr, routes := range t.m { + if q.Options().DestAddr != "*" { + if q.Options().DestAddr != destAddr { + continue + } + for _, route := range routes { + if q.Options().Network == "*" || q.Options().Network == route.Options().Network { + results = append(results, route) + } + } + } + + if q.Options().DestAddr == "*" { + for _, route := range routes { + if q.Options().Network == "*" || q.Options().Network == route.Options().Network { + results = append(results, route) + } + } + } + } + + if len(results) == 0 && q.Options().Policy != DiscardNoRoute { + return nil, ErrRouteNotFound + } + + return results, nil } // Watch returns routing table entry watcher diff --git a/router/table_watcher.go b/router/table_watcher.go index c246100b..79aa7c46 100644 --- a/router/table_watcher.go +++ b/router/table_watcher.go @@ -54,14 +54,14 @@ type tableWatcher struct { done chan struct{} } -// TODO: We might simply use Query here once QueryLookup is figured out +// TODO: this needs to be thought through properly // Next returns the next noticed action taken on table func (w *tableWatcher) Next() (*Result, error) { for { select { case res := <-w.resChan: switch w.opts.DestAddr { - case "*": + case "*", "": if w.opts.Network == "*" || w.opts.Network == res.Route.Options().Network { return res, nil }