router/registry: add fallback if routes aren't found in the cache (#1758)
* router/registry: add fallback if routes aren't found in the cache * router: fix rlock bug * router/registry: pass fetchRoutes into the table, not the router
This commit is contained in:
parent
df3e5364ca
commit
deea8fecf4
@ -48,10 +48,13 @@ func newRouter(opts ...Option) Router {
|
|||||||
// construct the router
|
// construct the router
|
||||||
r := &router{
|
r := &router{
|
||||||
options: options,
|
options: options,
|
||||||
table: newTable(),
|
|
||||||
subscribers: make(map[string]chan *Advert),
|
subscribers: make(map[string]chan *Advert),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create the new table, passing the fetchRoute method in as a fallback if
|
||||||
|
// the table doesn't contain the result for a query.
|
||||||
|
r.table = newTable(r.fetchRoutes)
|
||||||
|
|
||||||
// start the router and return
|
// start the router and return
|
||||||
r.start()
|
r.start()
|
||||||
return r
|
return r
|
||||||
@ -176,6 +179,29 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchRoutes retrieves all the routes for a given service and creates them in the routing table
|
||||||
|
func (r *router) fetchRoutes(service string) error {
|
||||||
|
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed getting services: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, srv := range services {
|
||||||
|
var domain string
|
||||||
|
if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 {
|
||||||
|
domain = srv.Metadata["domain"]
|
||||||
|
} else {
|
||||||
|
domain = registry.WildcardDomain
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.manageRoutes(srv, "create", domain); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// watchRegistry watches registry and updates routing table based on the received events.
|
// watchRegistry watches registry and updates routing table based on the received events.
|
||||||
// It returns error if either the registry watcher fails with error or if the routing table update fails.
|
// It returns error if either the registry watcher fails with error or if the routing table update fails.
|
||||||
func (r *router) watchRegistry(w registry.Watcher) error {
|
func (r *router) watchRegistry(w registry.Watcher) error {
|
||||||
|
@ -19,6 +19,8 @@ var (
|
|||||||
// table is an in-memory routing table
|
// table is an in-memory routing table
|
||||||
type table struct {
|
type table struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
|
// fetchRoutes for a service
|
||||||
|
fetchRoutes func(string) error
|
||||||
// routes stores service routes
|
// routes stores service routes
|
||||||
routes map[string]map[uint64]Route
|
routes map[string]map[uint64]Route
|
||||||
// watchers stores table watchers
|
// watchers stores table watchers
|
||||||
@ -26,8 +28,9 @@ type table struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newtable creates a new routing table and returns it
|
// newtable creates a new routing table and returns it
|
||||||
func newTable(opts ...Option) *table {
|
func newTable(fetchRoutes func(string) error, opts ...Option) *table {
|
||||||
return &table{
|
return &table{
|
||||||
|
fetchRoutes: fetchRoutes,
|
||||||
routes: make(map[string]map[uint64]Route),
|
routes: make(map[string]map[uint64]Route),
|
||||||
watchers: make(map[string]*tableWatcher),
|
watchers: make(map[string]*tableWatcher),
|
||||||
}
|
}
|
||||||
@ -249,12 +252,25 @@ func (t *table) Query(q ...QueryOption) ([]Route, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if opts.Service != "*" {
|
if opts.Service != "*" {
|
||||||
if _, ok := t.routes[opts.Service]; !ok {
|
// try and load services from the cache
|
||||||
return nil, ErrRouteNotFound
|
if _, ok := t.routes[opts.Service]; ok {
|
||||||
}
|
|
||||||
return findRoutes(t.routes[opts.Service], opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), nil
|
return findRoutes(t.routes[opts.Service], opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// load the cache and try again
|
||||||
|
t.RUnlock()
|
||||||
|
if err := t.fetchRoutes(opts.Service); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
t.RLock()
|
||||||
|
if _, ok := t.routes[opts.Service]; ok {
|
||||||
|
return findRoutes(t.routes[opts.Service], opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, ErrRouteNotFound
|
||||||
|
}
|
||||||
|
|
||||||
// search through all destinations
|
// search through all destinations
|
||||||
for _, routes := range t.routes {
|
for _, routes := range t.routes {
|
||||||
results = append(results, findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy)...)
|
results = append(results, findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy)...)
|
||||||
|
@ -6,7 +6,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func testSetup() (*table, Route) {
|
func testSetup() (*table, Route) {
|
||||||
table := newTable()
|
router := newRouter().(*router)
|
||||||
|
table := router.table
|
||||||
|
|
||||||
route := Route{
|
route := Route{
|
||||||
Service: "dest.svc",
|
Service: "dest.svc",
|
||||||
|
Loading…
Reference in New Issue
Block a user