Registry router fixes (#1961)
* only cache routes if told to do so * Use roundrobin selector and retry in proxy * Update lookup to require service * Fix compile * Fix compile * Update * Update * rename query to lookup * Update router.go * Update
This commit is contained in:
parent
a86a52cb7c
commit
ad5959318c
138
registry.go
138
registry.go
@ -47,7 +47,7 @@ func NewRouter(opts ...router.Option) router.Router {
|
||||
|
||||
// create the new table, passing the fetchRoute method in as a fallback if
|
||||
// the table doesn't contain the result for a query.
|
||||
r.table = newTable(r.lookup)
|
||||
r.table = newTable()
|
||||
|
||||
// start the router
|
||||
r.start()
|
||||
@ -241,8 +241,41 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close the router
|
||||
func (r *rtr) Close() error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
select {
|
||||
case <-r.exit:
|
||||
return nil
|
||||
default:
|
||||
if !r.running {
|
||||
return nil
|
||||
}
|
||||
close(r.exit)
|
||||
|
||||
}
|
||||
|
||||
r.running = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// lookup retrieves all the routes for a given service and creates them in the routing table
|
||||
func (r *rtr) lookup(service string) ([]router.Route, error) {
|
||||
func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
|
||||
q := router.NewLookup(opts...)
|
||||
|
||||
// if we find the routes filter and return them
|
||||
routes, err := r.table.Query(service)
|
||||
if err == nil {
|
||||
routes = router.Filter(routes, q)
|
||||
if len(routes) == 0 {
|
||||
return nil, router.ErrRouteNotFound
|
||||
}
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
// lookup the route
|
||||
logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
|
||||
|
||||
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
|
||||
@ -254,8 +287,6 @@ func (r *rtr) lookup(service string) ([]router.Route, error) {
|
||||
return nil, fmt.Errorf("failed getting services: %v", err)
|
||||
}
|
||||
|
||||
var routes []router.Route
|
||||
|
||||
for _, srv := range services {
|
||||
domain := getDomain(srv)
|
||||
// TODO: should we continue to send the event indicating we created a route?
|
||||
@ -263,6 +294,17 @@ func (r *rtr) lookup(service string) ([]router.Route, error) {
|
||||
routes = append(routes, r.createRoutes(srv, domain)...)
|
||||
}
|
||||
|
||||
// if we're supposed to cache then save the routes
|
||||
if r.options.Cache {
|
||||
for _, route := range routes {
|
||||
r.table.Create(route)
|
||||
}
|
||||
}
|
||||
|
||||
routes = router.Filter(routes, q)
|
||||
if len(routes) == 0 {
|
||||
return nil, router.ErrRouteNotFound
|
||||
}
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
@ -324,13 +366,6 @@ func (r *rtr) start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if r.options.Precache {
|
||||
// add all local service routes into the routing table
|
||||
if err := r.loadRoutes(r.options.Registry); err != nil {
|
||||
return fmt.Errorf("failed loading registry routes: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// add default gateway into routing table
|
||||
if r.options.Gateway != "" {
|
||||
// note, the only non-default value is the gateway
|
||||
@ -350,25 +385,59 @@ func (r *rtr) start() error {
|
||||
|
||||
// create error and exit channels
|
||||
r.exit = make(chan bool)
|
||||
r.running = true
|
||||
|
||||
// periodically refresh all the routes
|
||||
// only cache if told to do so
|
||||
if !r.options.Cache {
|
||||
return nil
|
||||
}
|
||||
|
||||
// create a refresh notify channel
|
||||
refresh := make(chan bool, 1)
|
||||
|
||||
// fires the refresh for loading routes
|
||||
refreshRoutes := func() {
|
||||
select {
|
||||
case refresh <- true:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// refresh all the routes in the event of a failure watching the registry
|
||||
go func() {
|
||||
t1 := time.NewTicker(RefreshInterval)
|
||||
defer t1.Stop()
|
||||
var lastRefresh time.Time
|
||||
|
||||
t2 := time.NewTicker(PruneInterval)
|
||||
defer t2.Stop()
|
||||
// load a refresh
|
||||
refreshRoutes()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.exit:
|
||||
return
|
||||
case <-t2.C:
|
||||
r.table.pruneRoutes(RefreshInterval)
|
||||
case <-t1.C:
|
||||
case <-refresh:
|
||||
// don't refresh if we've done so in the past minute
|
||||
if !lastRefresh.IsZero() && time.Since(lastRefresh) < time.Minute {
|
||||
continue
|
||||
}
|
||||
|
||||
// load new routes
|
||||
if err := r.loadRoutes(r.options.Registry); err != nil {
|
||||
logger.Debugf("failed refreshing registry routes: %s", err)
|
||||
// in this don't prune
|
||||
continue
|
||||
}
|
||||
|
||||
// first time so nothing to prune
|
||||
if !lastRefresh.IsZero() {
|
||||
// prune any routes since last refresh since we've
|
||||
// updated basically everything we care about
|
||||
r.table.pruneRoutes(time.Since(lastRefresh))
|
||||
}
|
||||
|
||||
// update the refresh time
|
||||
lastRefresh = time.Now()
|
||||
case <-time.After(RefreshInterval):
|
||||
refreshRoutes()
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -386,6 +455,8 @@ func (r *rtr) start() error {
|
||||
logger.Debugf("failed creating registry watcher: %v", err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
// in the event of an error reload routes
|
||||
refreshRoutes()
|
||||
continue
|
||||
}
|
||||
|
||||
@ -395,46 +466,21 @@ func (r *rtr) start() error {
|
||||
logger.Debugf("Error watching the registry: %v", err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
// in the event of an error reload routes
|
||||
refreshRoutes()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
r.running = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lookup routes in the routing table
|
||||
func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) {
|
||||
return r.Table().Query(q...)
|
||||
}
|
||||
|
||||
// Watch routes
|
||||
func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) {
|
||||
return r.table.Watch(opts...)
|
||||
}
|
||||
|
||||
// Close the router
|
||||
func (r *rtr) Close() error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
select {
|
||||
case <-r.exit:
|
||||
return nil
|
||||
default:
|
||||
if !r.running {
|
||||
return nil
|
||||
}
|
||||
close(r.exit)
|
||||
|
||||
}
|
||||
|
||||
r.running = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// String prints debugging information about router
|
||||
func (r *rtr) String() string {
|
||||
return "registry"
|
||||
|
136
table.go
136
table.go
@ -12,8 +12,6 @@ import (
|
||||
// table is an in-memory routing table
|
||||
type table struct {
|
||||
sync.RWMutex
|
||||
// lookup for a service
|
||||
lookup func(string) ([]router.Route, error)
|
||||
// routes stores service routes
|
||||
routes map[string]map[uint64]*route
|
||||
// watchers stores table watchers
|
||||
@ -26,9 +24,8 @@ type route struct {
|
||||
}
|
||||
|
||||
// newtable creates a new routing table and returns it
|
||||
func newTable(lookup func(string) ([]router.Route, error), opts ...router.Option) *table {
|
||||
func newTable() *table {
|
||||
return &table{
|
||||
lookup: lookup,
|
||||
routes: make(map[string]map[uint64]*route),
|
||||
watchers: make(map[string]*tableWatcher),
|
||||
}
|
||||
@ -216,136 +213,23 @@ func (t *table) List() ([]router.Route, error) {
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
// isMatch checks if the route matches given query options
|
||||
func isMatch(route router.Route, address, gateway, network, rtr, link string) bool {
|
||||
// matches the values provided
|
||||
match := func(a, b string) bool {
|
||||
if a == "*" || b == "*" || a == b {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// a simple struct to hold our values
|
||||
type compare struct {
|
||||
a string
|
||||
b string
|
||||
}
|
||||
|
||||
// compare the following values
|
||||
values := []compare{
|
||||
{gateway, route.Gateway},
|
||||
{network, route.Network},
|
||||
{rtr, route.Router},
|
||||
{address, route.Address},
|
||||
{link, route.Link},
|
||||
}
|
||||
|
||||
for _, v := range values {
|
||||
// attempt to match each value
|
||||
if !match(v.a, v.b) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// filterRoutes finds all the routes for given network and router and returns them
|
||||
func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.Route {
|
||||
address := opts.Address
|
||||
gateway := opts.Gateway
|
||||
network := opts.Network
|
||||
rtr := opts.Router
|
||||
link := opts.Link
|
||||
|
||||
// routeMap stores the routes we're going to advertise
|
||||
routeMap := make(map[string][]router.Route)
|
||||
|
||||
for _, rt := range routes {
|
||||
// get the actual route
|
||||
route := rt.route
|
||||
|
||||
if isMatch(route, address, gateway, network, rtr, link) {
|
||||
// add matchihg route to the routeMap
|
||||
routeKey := route.Service + "@" + route.Network
|
||||
routeMap[routeKey] = append(routeMap[routeKey], route)
|
||||
}
|
||||
}
|
||||
|
||||
var results []router.Route
|
||||
|
||||
for _, route := range routeMap {
|
||||
results = append(results, route...)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// Lookup queries routing table and returns all routes that match the lookup query
|
||||
func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) {
|
||||
// create new query options
|
||||
opts := router.NewQuery(q...)
|
||||
|
||||
// create a cwslicelist of query results
|
||||
results := make([]router.Route, 0, len(t.routes))
|
||||
|
||||
// readAndFilter routes for this service under read lock.
|
||||
readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
routes, ok := t.routes[q.Service]
|
||||
if !ok || len(routes) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return filterRoutes(routes, q), true
|
||||
}
|
||||
|
||||
if opts.Service != "*" {
|
||||
// try and load services from the cache
|
||||
if routes, ok := readAndFilter(opts); ok {
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
// lookup the route and try again
|
||||
// TODO: move this logic out of the hot path
|
||||
// being hammered on queries will require multiple lookups
|
||||
routes, err := t.lookup(opts.Service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// cache the routes
|
||||
for _, rt := range routes {
|
||||
t.Create(rt)
|
||||
}
|
||||
|
||||
// try again
|
||||
if routes, ok := readAndFilter(opts); ok {
|
||||
return routes, nil
|
||||
}
|
||||
func (t *table) Query(service string) ([]router.Route, error) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
routeMap, ok := t.routes[service]
|
||||
if !ok {
|
||||
return nil, router.ErrRouteNotFound
|
||||
}
|
||||
|
||||
// search through all destinations
|
||||
t.RLock()
|
||||
var routes []router.Route
|
||||
|
||||
for _, routes := range t.routes {
|
||||
// filter the routes
|
||||
found := filterRoutes(routes, opts)
|
||||
// ensure we don't append zero length routes
|
||||
if len(found) == 0 {
|
||||
continue
|
||||
}
|
||||
results = append(results, found...)
|
||||
for _, rt := range routeMap {
|
||||
routes = append(routes, rt.route)
|
||||
}
|
||||
|
||||
t.RUnlock()
|
||||
|
||||
return results, nil
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
// Watch returns routing table entry watcher
|
||||
|
235
table_test.go
235
table_test.go
@ -1,15 +1,13 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/v3/router"
|
||||
)
|
||||
|
||||
func testSetup() (*table, router.Route) {
|
||||
routr := NewRouter().(*rtr)
|
||||
table := newTable(routr.lookup)
|
||||
table := newTable()
|
||||
|
||||
route := router.Route{
|
||||
Service: "dest.svc",
|
||||
@ -114,235 +112,20 @@ func TestList(t *testing.T) {
|
||||
func TestQuery(t *testing.T) {
|
||||
table, route := testSetup()
|
||||
|
||||
svc := []string{"svc1", "svc2", "svc3", "svc1"}
|
||||
net := []string{"net1", "net2", "net1", "net3"}
|
||||
gw := []string{"gw1", "gw2", "gw3", "gw3"}
|
||||
rtr := []string{"rtr1", "rt2", "rt3", "rtr3"}
|
||||
|
||||
for i := 0; i < len(svc); i++ {
|
||||
route.Service = svc[i]
|
||||
route.Network = net[i]
|
||||
route.Gateway = gw[i]
|
||||
route.Router = rtr[i]
|
||||
route.Link = router.DefaultLink
|
||||
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Fatalf("error adding route: %s", err)
|
||||
}
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Fatalf("error adding route: %s", err)
|
||||
}
|
||||
|
||||
// return all routes
|
||||
routes, err := table.Query()
|
||||
rt, err := table.Query(route.Service)
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
} else if len(routes) == 0 {
|
||||
t.Fatalf("error looking up routes: not found")
|
||||
t.Fatal("Expected a route got err", err)
|
||||
}
|
||||
|
||||
// query routes particular network
|
||||
network := "net1"
|
||||
|
||||
routes, err = table.Query(router.QueryNetwork(network))
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
if len(rt) != 1 {
|
||||
t.Fatalf("Expected one route got %d", len(rt))
|
||||
}
|
||||
|
||||
if len(routes) != 2 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
|
||||
}
|
||||
|
||||
for _, route := range routes {
|
||||
if route.Network != network {
|
||||
t.Fatalf("incorrect route returned. Expected network: %s, found: %s", network, route.Network)
|
||||
}
|
||||
}
|
||||
|
||||
// query routes for particular gateway
|
||||
gateway := "gw1"
|
||||
|
||||
routes, err = table.Query(router.QueryGateway(gateway))
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
|
||||
}
|
||||
|
||||
if routes[0].Gateway != gateway {
|
||||
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
|
||||
}
|
||||
|
||||
// query routes for particular router
|
||||
rt := "rtr1"
|
||||
|
||||
routes, err = table.Query(router.QueryRouter(rt))
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
|
||||
}
|
||||
|
||||
if routes[0].Router != rt {
|
||||
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
|
||||
}
|
||||
|
||||
// query particular gateway and network
|
||||
query := []router.QueryOption{
|
||||
router.QueryGateway(gateway),
|
||||
router.QueryNetwork(network),
|
||||
router.QueryRouter(rt),
|
||||
}
|
||||
|
||||
routes, err = table.Query(query...)
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
|
||||
}
|
||||
|
||||
if routes[0].Gateway != gateway {
|
||||
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
|
||||
}
|
||||
|
||||
if routes[0].Network != network {
|
||||
t.Fatalf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
|
||||
}
|
||||
|
||||
if routes[0].Router != rt {
|
||||
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
|
||||
}
|
||||
|
||||
// non-existen route query
|
||||
routes, err = table.Query(router.QueryService("foobar"))
|
||||
if err != router.ErrRouteNotFound {
|
||||
t.Fatalf("error looking up routes. Expected: %s, found: %s", router.ErrRouteNotFound, err)
|
||||
}
|
||||
|
||||
if len(routes) != 0 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
|
||||
}
|
||||
|
||||
// query NO routes
|
||||
query = []router.QueryOption{
|
||||
router.QueryGateway(gateway),
|
||||
router.QueryNetwork(network),
|
||||
router.QueryLink("network"),
|
||||
}
|
||||
|
||||
routes, err = table.Query(query...)
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) > 0 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
|
||||
}
|
||||
|
||||
// insert local routes to query
|
||||
for i := 0; i < 2; i++ {
|
||||
route.Link = "foobar"
|
||||
route.Address = fmt.Sprintf("local.route.address-%d", i)
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Fatalf("error adding route: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// query local routes
|
||||
query = []router.QueryOption{
|
||||
router.QueryGateway("*"),
|
||||
router.QueryNetwork("*"),
|
||||
router.QueryLink("foobar"),
|
||||
}
|
||||
|
||||
routes, err = table.Query(query...)
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) != 2 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
|
||||
}
|
||||
|
||||
// add two different routes for svcX with different metric
|
||||
for i := 0; i < 2; i++ {
|
||||
route.Service = "svcX"
|
||||
route.Address = fmt.Sprintf("svcX.route.address-%d", i)
|
||||
route.Metric = int64(100 + i)
|
||||
route.Link = router.DefaultLink
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Fatalf("error adding route: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
query = []router.QueryOption{
|
||||
router.QueryService("svcX"),
|
||||
}
|
||||
|
||||
routes, err = table.Query(query...)
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) != 2 {
|
||||
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
|
||||
if rt[0].Hash() != route.Hash() {
|
||||
t.Fatal("Mismatched routes received")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFallback(t *testing.T) {
|
||||
|
||||
r := &rtr{
|
||||
options: router.DefaultOptions(),
|
||||
}
|
||||
route := router.Route{
|
||||
Service: "go.micro.service.foo",
|
||||
Router: r.options.Id,
|
||||
Link: router.DefaultLink,
|
||||
Metric: router.DefaultLocalMetric,
|
||||
}
|
||||
r.table = newTable(func(s string) ([]router.Route, error) {
|
||||
return []router.Route{route}, nil
|
||||
})
|
||||
r.start()
|
||||
|
||||
rts, err := r.Lookup(router.QueryService("go.micro.service.foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up service %s", err)
|
||||
}
|
||||
if len(rts) != 1 {
|
||||
t.Fatalf("incorrect number of routes returned %d", len(rts))
|
||||
}
|
||||
|
||||
// deleting from the table but the next query should invoke the fallback that we passed during new table creation
|
||||
if err := r.table.Delete(route); err != nil {
|
||||
t.Fatalf("error deleting route %s", err)
|
||||
}
|
||||
|
||||
rts, err = r.Lookup(router.QueryService("go.micro.service.foo"))
|
||||
if err != nil {
|
||||
t.Fatalf("error looking up service %s", err)
|
||||
}
|
||||
if len(rts) != 1 {
|
||||
t.Fatalf("incorrect number of routes returned %d", len(rts))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestFallbackError(t *testing.T) {
|
||||
r := &rtr{
|
||||
options: router.DefaultOptions(),
|
||||
}
|
||||
r.table = newTable(func(s string) ([]router.Route, error) {
|
||||
return nil, fmt.Errorf("ERROR")
|
||||
})
|
||||
r.start()
|
||||
_, err := r.Lookup(router.QueryService("go.micro.service.foo"))
|
||||
if err == nil {
|
||||
t.Fatalf("expected error looking up service but none returned")
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user