Registry router fixes (#1961)

* only cache routes if told to do so

* Use roundrobin selector and retry in proxy

* Update lookup to require service

* Fix compile

* Fix compile

* Update

* Update

* rename query to lookup

* Update router.go

* Update
This commit is contained in:
Asim Aslam 2020-08-21 09:23:01 +01:00 committed by GitHub
parent 78a79ca9e1
commit f146b52418
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 267 additions and 558 deletions

View File

@ -19,16 +19,16 @@ func LookupRoute(ctx context.Context, req Request, opts CallOptions) ([]string,
} }
// construct the router query // construct the router query
query := []router.QueryOption{router.QueryService(req.Service())} query := []router.LookupOption{}
// if a custom network was requested, pass this to the router. By default the router will use it's // if a custom network was requested, pass this to the router. By default the router will use it's
// own network, which is set during initialisation. // own network, which is set during initialisation.
if len(opts.Network) > 0 { if len(opts.Network) > 0 {
query = append(query, router.QueryNetwork(opts.Network)) query = append(query, router.LookupNetwork(opts.Network))
} }
// lookup the routes which can be used to execute the request // lookup the routes which can be used to execute the request
routes, err := opts.Router.Lookup(query...) routes, err := opts.Router.Lookup(req.Service(), query...)
if err == router.ErrRouteNotFound { if err == router.ErrRouteNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error()) return nil, errors.InternalServerError("go.micro.client", "service %s: %s", req.Service(), err.Error())
} else if err != nil { } else if err != nil {

View File

@ -11,7 +11,7 @@ import (
"github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/router"
regRouter "github.com/micro/go-micro/v3/router/registry" regRouter "github.com/micro/go-micro/v3/router/registry"
"github.com/micro/go-micro/v3/selector" "github.com/micro/go-micro/v3/selector"
"github.com/micro/go-micro/v3/selector/random" "github.com/micro/go-micro/v3/selector/roundrobin"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
thttp "github.com/micro/go-micro/v3/transport/http" thttp "github.com/micro/go-micro/v3/transport/http"
) )
@ -125,7 +125,7 @@ func NewOptions(options ...Option) Options {
PoolTTL: DefaultPoolTTL, PoolTTL: DefaultPoolTTL,
Broker: http.NewBroker(), Broker: http.NewBroker(),
Router: regRouter.NewRouter(), Router: regRouter.NewRouter(),
Selector: random.NewSelector(), Selector: roundrobin.NewSelector(),
Transport: thttp.NewTransport(), Transport: thttp.NewTransport(),
} }

View File

@ -961,12 +961,11 @@ func (n *mucpNetwork) processNetChan(listener tunnel.Listener) {
route.Metric = d route.Metric = d
} }
q := []router.QueryOption{ q := []router.LookupOption{
router.QueryService(route.Service), router.LookupLink(route.Link),
router.QueryLink(route.Link),
} }
routes, err := n.router.Table().Query(q...) routes, err := n.router.Lookup(route.Service, q...)
if err != nil && err != router.ErrRouteNotFound { if err != nil && err != router.ErrRouteNotFound {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network node %s failed listing best routes for %s: %v", n.id, route.Service, err) logger.Debugf("Network node %s failed listing best routes for %s: %v", n.id, route.Service, err)
@ -1079,16 +1078,15 @@ func (n *mucpNetwork) processNetChan(listener tunnel.Listener) {
} }
// pruneRoutes prunes routes return by given query // pruneRoutes prunes routes return by given query
func (n *mucpNetwork) pruneRoutes(q ...router.QueryOption) error { func (n *mucpNetwork) pruneRoutes(q ...router.LookupOption) error {
routes, err := n.router.Table().Query(q...) routes, err := n.router.Table().List()
if err != nil && err != router.ErrRouteNotFound { if err != nil && err != router.ErrRouteNotFound {
return err return err
} }
for _, route := range routes { // filter and delete the routes in question
if err := n.router.Table().Delete(route); err != nil && err != router.ErrRouteNotFound { for _, route := range router.Filter(routes, router.NewLookup(q...)) {
return err n.router.Table().Delete(route)
}
} }
return nil return nil
@ -1097,18 +1095,18 @@ func (n *mucpNetwork) pruneRoutes(q ...router.QueryOption) error {
// pruneNodeRoutes prunes routes that were either originated by or routable via given node // pruneNodeRoutes prunes routes that were either originated by or routable via given node
func (n *mucpNetwork) prunePeerRoutes(peer *node) error { func (n *mucpNetwork) prunePeerRoutes(peer *node) error {
// lookup all routes originated by router // lookup all routes originated by router
q := []router.QueryOption{ q := []router.LookupOption{
router.QueryRouter(peer.id), router.LookupRouter(peer.id),
router.QueryLink("*"), router.LookupLink("*"),
} }
if err := n.pruneRoutes(q...); err != nil { if err := n.pruneRoutes(q...); err != nil {
return err return err
} }
// lookup all routes routable via gw // lookup all routes routable via gw
q = []router.QueryOption{ q = []router.LookupOption{
router.QueryGateway(peer.address), router.LookupGateway(peer.address),
router.QueryLink("*"), router.LookupLink("*"),
} }
if err := n.pruneRoutes(q...); err != nil { if err := n.pruneRoutes(q...); err != nil {
return err return err
@ -1291,7 +1289,7 @@ func (n *mucpNetwork) manage() {
} }
// otherwise delete all the routes originated by it // otherwise delete all the routes originated by it
if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil { if err := n.pruneRoutes(router.LookupRouter(route.Router)); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Network failed deleting routes by %s: %v", route.Router, err) logger.Debugf("Network failed deleting routes by %s: %v", route.Router, err)
} }

View File

@ -94,10 +94,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
logger.Tracef("Proxy received request for %s %s", service, endpoint) logger.Tracef("Proxy received request for %s %s", service, endpoint)
} }
// no retries with the proxy var opts []client.CallOption
opts := []client.CallOption{
client.WithRetries(0),
}
// call a specific backend // call a specific backend
if len(p.Endpoint) > 0 { if len(p.Endpoint) > 0 {

View File

@ -208,7 +208,7 @@ func (p *Proxy) getRoute(ctx context.Context, service string) ([]router.Route, e
func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) { func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) {
// lookup the routes in the router // lookup the routes in the router
results, err := p.Router.Lookup(router.QueryService(service), router.QueryNetwork("*")) results, err := p.Router.Lookup(service, router.LookupNetwork("*"))
if err != nil { if err != nil {
// assumption that we're ok with stale routes // assumption that we're ok with stale routes
logger.Debugf("Failed to lookup route for %s: %v", service, err) logger.Debugf("Failed to lookup route for %s: %v", service, err)

View File

@ -2,9 +2,9 @@
package registry package registry
import ( import (
"github.com/micro/go-micro/v3/resolver"
"github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/registry/mdns" "github.com/micro/go-micro/v3/registry/mdns"
"github.com/micro/go-micro/v3/resolver"
) )
// Resolver is a registry network resolver // Resolver is a registry network resolver

View File

@ -17,19 +17,17 @@ func NewRouter(opts ...router.Option) router.Router {
if len(options.Network) == 0 { if len(options.Network) == 0 {
options.Network = "micro" options.Network = "micro"
} }
return &dns{options, &table{options}} return &dns{options}
} }
type dns struct { type dns struct {
options router.Options options router.Options
table *table
} }
func (d *dns) Init(opts ...router.Option) error { func (d *dns) Init(opts ...router.Option) error {
for _, o := range opts { for _, o := range opts {
o(&d.options) o(&d.options)
} }
d.table.options = d.options
return nil return nil
} }
@ -38,50 +36,16 @@ func (d *dns) Options() router.Options {
} }
func (d *dns) Table() router.Table { func (d *dns) Table() router.Table {
return d.table return nil
}
func (d *dns) Lookup(opts ...router.QueryOption) ([]router.Route, error) {
return d.table.Query(opts...)
}
func (d *dns) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return nil, nil
} }
func (d *dns) Close() error { func (d *dns) Close() error {
return nil return nil
} }
func (d *dns) String() string { func (d *dns) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
return "dns"
}
type table struct {
options router.Options
}
func (t *table) Create(router.Route) error {
return nil
}
func (t *table) Delete(router.Route) error {
return nil
}
func (t *table) Update(router.Route) error {
return nil
}
func (t *table) List() ([]router.Route, error) {
return nil, nil
}
func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) {
options := router.NewQuery(opts...)
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000 // check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
host, port, err := net.SplitHostPort(options.Service) host, port, err := net.SplitHostPort(service)
if err == nil { if err == nil {
// lookup the service using A records // lookup the service using A records
ips, err := net.LookupHost(host) ips, err := net.LookupHost(host)
@ -95,7 +59,7 @@ func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) {
result := make([]router.Route, len(ips)) result := make([]router.Route, len(ips))
for i, ip := range ips { for i, ip := range ips {
result[i] = router.Route{ result[i] = router.Route{
Service: options.Service, Service: service,
Address: fmt.Sprintf("%s:%d", ip, uint16(p)), Address: fmt.Sprintf("%s:%d", ip, uint16(p)),
} }
} }
@ -104,7 +68,7 @@ func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) {
// we didn't get the port so we'll lookup the service using SRV records. If we can't lookup the // we didn't get the port so we'll lookup the service using SRV records. If we can't lookup the
// service using the SRV record, we return the error. // service using the SRV record, we return the error.
_, nodes, err := net.LookupSRV(options.Service, "tcp", t.options.Network) _, nodes, err := net.LookupSRV(service, "tcp", d.options.Network)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -113,10 +77,18 @@ func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) {
result := make([]router.Route, len(nodes)) result := make([]router.Route, len(nodes))
for i, n := range nodes { for i, n := range nodes {
result[i] = router.Route{ result[i] = router.Route{
Service: options.Service, Service: service,
Address: fmt.Sprintf("%s:%d", n.Target, n.Port), Address: fmt.Sprintf("%s:%d", n.Target, n.Port),
Network: t.options.Network, Network: d.options.Network,
} }
} }
return result, nil return result, nil
} }
func (d *dns) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return nil, nil
}
func (d *dns) String() string {
return "dns"
}

View File

@ -42,19 +42,19 @@ func (m *mdnsRouter) Table() router.Table {
return nil return nil
} }
func (m *mdnsRouter) Lookup(opts ...router.QueryOption) ([]router.Route, error) { func (m *mdnsRouter) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
options := router.NewQuery(opts...) options := router.NewLookup(opts...)
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000 // check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
service, port, err := net.SplitHostPort(options.Service) srv, port, err := net.SplitHostPort(service)
if err != nil { if err != nil {
service = options.Service srv = service
} }
// query for the host // query for the host
entries := make(chan *mdns.ServiceEntry) entries := make(chan *mdns.ServiceEntry)
p := mdns.DefaultParams(service) p := mdns.DefaultParams(srv)
p.Timeout = time.Millisecond * 100 p.Timeout = time.Millisecond * 100
p.Entries = entries p.Entries = entries

View File

@ -22,8 +22,8 @@ type Options struct {
Registry registry.Registry Registry registry.Registry
// Context for additional options // Context for additional options
Context context.Context Context context.Context
// Precache routes // Cache routes
Precache bool Cache bool
} }
// Id sets Router Id // Id sets Router Id
@ -61,10 +61,10 @@ func Registry(r registry.Registry) Option {
} }
} }
// Precache the routes // Cache the routes
func Precache() Option { func Cache() Option {
return func(o *Options) { return func(o *Options) {
o.Precache = true o.Cache = true
} }
} }

View File

@ -1,13 +1,11 @@
package router package router
// QueryOption sets routing table query options // LookupOption sets routing table query options
type QueryOption func(*QueryOptions) type LookupOption func(*LookupOptions)
// QueryOptions are routing table query options // LookupOptions are routing table query options
// TODO replace with Filter(Route) bool // TODO replace with Filter(Route) bool
type QueryOptions struct { type LookupOptions struct {
// Service is destination service name
Service string
// Address of the service // Address of the service
Address string Address string
// Gateway is route gateway // Gateway is route gateway
@ -20,53 +18,45 @@ type QueryOptions struct {
Link string Link string
} }
// QueryService sets service to query // LookupAddress sets service to query
func QueryService(s string) QueryOption { func LookupAddress(a string) LookupOption {
return func(o *QueryOptions) { return func(o *LookupOptions) {
o.Service = s
}
}
// QueryAddress sets service to query
func QueryAddress(a string) QueryOption {
return func(o *QueryOptions) {
o.Address = a o.Address = a
} }
} }
// QueryGateway sets gateway address to query // LookupGateway sets gateway address to query
func QueryGateway(g string) QueryOption { func LookupGateway(g string) LookupOption {
return func(o *QueryOptions) { return func(o *LookupOptions) {
o.Gateway = g o.Gateway = g
} }
} }
// QueryNetwork sets network name to query // LookupNetwork sets network name to query
func QueryNetwork(n string) QueryOption { func LookupNetwork(n string) LookupOption {
return func(o *QueryOptions) { return func(o *LookupOptions) {
o.Network = n o.Network = n
} }
} }
// QueryRouter sets router id to query // LookupRouter sets router id to query
func QueryRouter(r string) QueryOption { func LookupRouter(r string) LookupOption {
return func(o *QueryOptions) { return func(o *LookupOptions) {
o.Router = r o.Router = r
} }
} }
// QueryLink sets the link to query // LookupLink sets the link to query
func QueryLink(link string) QueryOption { func LookupLink(link string) LookupOption {
return func(o *QueryOptions) { return func(o *LookupOptions) {
o.Link = link o.Link = link
} }
} }
// NewQuery creates new query and returns it // NewLookup creates new query and returns it
func NewQuery(opts ...QueryOption) QueryOptions { func NewLookup(opts ...LookupOption) LookupOptions {
// default options // default options
qopts := QueryOptions{ qopts := LookupOptions{
Service: "*",
Address: "*", Address: "*",
Gateway: "*", Gateway: "*",
Network: "*", Network: "*",
@ -80,3 +70,66 @@ func NewQuery(opts ...QueryOption) QueryOptions {
return qopts return qopts
} }
// isMatch checks if the route matches given query options
func isMatch(route Route, address, gateway, network, rtr, link string) bool {
// matches the values provided
match := func(a, b string) bool {
if a == "*" || b == "*" || a == b {
return true
}
return false
}
// a simple struct to hold our values
type compare struct {
a string
b string
}
// compare the following values
values := []compare{
{gateway, route.Gateway},
{network, route.Network},
{rtr, route.Router},
{address, route.Address},
{link, route.Link},
}
for _, v := range values {
// attempt to match each value
if !match(v.a, v.b) {
return false
}
}
return true
}
// filterRoutes finds all the routes for given network and router and returns them
func Filter(routes []Route, opts LookupOptions) []Route {
address := opts.Address
gateway := opts.Gateway
network := opts.Network
rtr := opts.Router
link := opts.Link
// routeMap stores the routes we're going to advertise
routeMap := make(map[string][]Route)
for _, route := range routes {
if isMatch(route, address, gateway, network, rtr, link) {
// add matchihg route to the routeMap
routeKey := route.Service + "@" + route.Network
routeMap[routeKey] = append(routeMap[routeKey], route)
}
}
var results []Route
for _, route := range routeMap {
results = append(results, route...)
}
return results
}

View File

@ -47,7 +47,7 @@ func NewRouter(opts ...router.Option) router.Router {
// create the new table, passing the fetchRoute method in as a fallback if // create the new table, passing the fetchRoute method in as a fallback if
// the table doesn't contain the result for a query. // the table doesn't contain the result for a query.
r.table = newTable(r.lookup) r.table = newTable()
// start the router // start the router
r.start() r.start()
@ -241,8 +241,41 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
return nil return nil
} }
// Close the router
func (r *rtr) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.exit:
return nil
default:
if !r.running {
return nil
}
close(r.exit)
}
r.running = false
return nil
}
// lookup retrieves all the routes for a given service and creates them in the routing table // lookup retrieves all the routes for a given service and creates them in the routing table
func (r *rtr) lookup(service string) ([]router.Route, error) { func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
q := router.NewLookup(opts...)
// if we find the routes filter and return them
routes, err := r.table.Query(service)
if err == nil {
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
}
return routes, nil
}
// lookup the route
logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain) logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain)) services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
@ -254,8 +287,6 @@ func (r *rtr) lookup(service string) ([]router.Route, error) {
return nil, fmt.Errorf("failed getting services: %v", err) return nil, fmt.Errorf("failed getting services: %v", err)
} }
var routes []router.Route
for _, srv := range services { for _, srv := range services {
domain := getDomain(srv) domain := getDomain(srv)
// TODO: should we continue to send the event indicating we created a route? // TODO: should we continue to send the event indicating we created a route?
@ -263,6 +294,17 @@ func (r *rtr) lookup(service string) ([]router.Route, error) {
routes = append(routes, r.createRoutes(srv, domain)...) routes = append(routes, r.createRoutes(srv, domain)...)
} }
// if we're supposed to cache then save the routes
if r.options.Cache {
for _, route := range routes {
r.table.Create(route)
}
}
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
}
return routes, nil return routes, nil
} }
@ -324,13 +366,6 @@ func (r *rtr) start() error {
return nil return nil
} }
if r.options.Precache {
// add all local service routes into the routing table
if err := r.loadRoutes(r.options.Registry); err != nil {
return fmt.Errorf("failed loading registry routes: %s", err)
}
}
// add default gateway into routing table // add default gateway into routing table
if r.options.Gateway != "" { if r.options.Gateway != "" {
// note, the only non-default value is the gateway // note, the only non-default value is the gateway
@ -350,25 +385,59 @@ func (r *rtr) start() error {
// create error and exit channels // create error and exit channels
r.exit = make(chan bool) r.exit = make(chan bool)
r.running = true
// periodically refresh all the routes // only cache if told to do so
if !r.options.Cache {
return nil
}
// create a refresh notify channel
refresh := make(chan bool, 1)
// fires the refresh for loading routes
refreshRoutes := func() {
select {
case refresh <- true:
default:
}
}
// refresh all the routes in the event of a failure watching the registry
go func() { go func() {
t1 := time.NewTicker(RefreshInterval) var lastRefresh time.Time
defer t1.Stop()
t2 := time.NewTicker(PruneInterval) // load a refresh
defer t2.Stop() refreshRoutes()
for { for {
select { select {
case <-r.exit: case <-r.exit:
return return
case <-t2.C: case <-refresh:
r.table.pruneRoutes(RefreshInterval) // don't refresh if we've done so in the past minute
case <-t1.C: if !lastRefresh.IsZero() && time.Since(lastRefresh) < time.Minute {
continue
}
// load new routes
if err := r.loadRoutes(r.options.Registry); err != nil { if err := r.loadRoutes(r.options.Registry); err != nil {
logger.Debugf("failed refreshing registry routes: %s", err) logger.Debugf("failed refreshing registry routes: %s", err)
// in this don't prune
continue
} }
// first time so nothing to prune
if !lastRefresh.IsZero() {
// prune any routes since last refresh since we've
// updated basically everything we care about
r.table.pruneRoutes(time.Since(lastRefresh))
}
// update the refresh time
lastRefresh = time.Now()
case <-time.After(RefreshInterval):
refreshRoutes()
} }
} }
}() }()
@ -386,6 +455,8 @@ func (r *rtr) start() error {
logger.Debugf("failed creating registry watcher: %v", err) logger.Debugf("failed creating registry watcher: %v", err)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
// in the event of an error reload routes
refreshRoutes()
continue continue
} }
@ -395,46 +466,21 @@ func (r *rtr) start() error {
logger.Debugf("Error watching the registry: %v", err) logger.Debugf("Error watching the registry: %v", err)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
// in the event of an error reload routes
refreshRoutes()
} }
} }
} }
}() }()
r.running = true
return nil return nil
} }
// Lookup routes in the routing table
func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) {
return r.Table().Query(q...)
}
// Watch routes // Watch routes
func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) { func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return r.table.Watch(opts...) return r.table.Watch(opts...)
} }
// Close the router
func (r *rtr) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.exit:
return nil
default:
if !r.running {
return nil
}
close(r.exit)
}
r.running = false
return nil
}
// String prints debugging information about router // String prints debugging information about router
func (r *rtr) String() string { func (r *rtr) String() string {
return "registry" return "registry"

View File

@ -12,8 +12,6 @@ import (
// table is an in-memory routing table // table is an in-memory routing table
type table struct { type table struct {
sync.RWMutex sync.RWMutex
// lookup for a service
lookup func(string) ([]router.Route, error)
// routes stores service routes // routes stores service routes
routes map[string]map[uint64]*route routes map[string]map[uint64]*route
// watchers stores table watchers // watchers stores table watchers
@ -26,9 +24,8 @@ type route struct {
} }
// newtable creates a new routing table and returns it // newtable creates a new routing table and returns it
func newTable(lookup func(string) ([]router.Route, error), opts ...router.Option) *table { func newTable() *table {
return &table{ return &table{
lookup: lookup,
routes: make(map[string]map[uint64]*route), routes: make(map[string]map[uint64]*route),
watchers: make(map[string]*tableWatcher), watchers: make(map[string]*tableWatcher),
} }
@ -216,136 +213,23 @@ func (t *table) List() ([]router.Route, error) {
return routes, nil return routes, nil
} }
// isMatch checks if the route matches given query options
func isMatch(route router.Route, address, gateway, network, rtr, link string) bool {
// matches the values provided
match := func(a, b string) bool {
if a == "*" || b == "*" || a == b {
return true
}
return false
}
// a simple struct to hold our values
type compare struct {
a string
b string
}
// compare the following values
values := []compare{
{gateway, route.Gateway},
{network, route.Network},
{rtr, route.Router},
{address, route.Address},
{link, route.Link},
}
for _, v := range values {
// attempt to match each value
if !match(v.a, v.b) {
return false
}
}
return true
}
// filterRoutes finds all the routes for given network and router and returns them
func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.Route {
address := opts.Address
gateway := opts.Gateway
network := opts.Network
rtr := opts.Router
link := opts.Link
// routeMap stores the routes we're going to advertise
routeMap := make(map[string][]router.Route)
for _, rt := range routes {
// get the actual route
route := rt.route
if isMatch(route, address, gateway, network, rtr, link) {
// add matchihg route to the routeMap
routeKey := route.Service + "@" + route.Network
routeMap[routeKey] = append(routeMap[routeKey], route)
}
}
var results []router.Route
for _, route := range routeMap {
results = append(results, route...)
}
return results
}
// Lookup queries routing table and returns all routes that match the lookup query // Lookup queries routing table and returns all routes that match the lookup query
func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) { func (t *table) Query(service string) ([]router.Route, error) {
// create new query options t.RLock()
opts := router.NewQuery(q...) defer t.RUnlock()
// create a cwslicelist of query results
results := make([]router.Route, 0, len(t.routes))
// readAndFilter routes for this service under read lock.
readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) {
t.RLock()
defer t.RUnlock()
routes, ok := t.routes[q.Service]
if !ok || len(routes) == 0 {
return nil, false
}
return filterRoutes(routes, q), true
}
if opts.Service != "*" {
// try and load services from the cache
if routes, ok := readAndFilter(opts); ok {
return routes, nil
}
// lookup the route and try again
// TODO: move this logic out of the hot path
// being hammered on queries will require multiple lookups
routes, err := t.lookup(opts.Service)
if err != nil {
return nil, err
}
// cache the routes
for _, rt := range routes {
t.Create(rt)
}
// try again
if routes, ok := readAndFilter(opts); ok {
return routes, nil
}
routeMap, ok := t.routes[service]
if !ok {
return nil, router.ErrRouteNotFound return nil, router.ErrRouteNotFound
} }
// search through all destinations var routes []router.Route
t.RLock()
for _, routes := range t.routes { for _, rt := range routeMap {
// filter the routes routes = append(routes, rt.route)
found := filterRoutes(routes, opts)
// ensure we don't append zero length routes
if len(found) == 0 {
continue
}
results = append(results, found...)
} }
t.RUnlock() return routes, nil
return results, nil
} }
// Watch returns routing table entry watcher // Watch returns routing table entry watcher

View File

@ -1,15 +1,13 @@
package registry package registry
import ( import (
"fmt"
"testing" "testing"
"github.com/micro/go-micro/v3/router" "github.com/micro/go-micro/v3/router"
) )
func testSetup() (*table, router.Route) { func testSetup() (*table, router.Route) {
routr := NewRouter().(*rtr) table := newTable()
table := newTable(routr.lookup)
route := router.Route{ route := router.Route{
Service: "dest.svc", Service: "dest.svc",
@ -114,235 +112,20 @@ func TestList(t *testing.T) {
func TestQuery(t *testing.T) { func TestQuery(t *testing.T) {
table, route := testSetup() table, route := testSetup()
svc := []string{"svc1", "svc2", "svc3", "svc1"} if err := table.Create(route); err != nil {
net := []string{"net1", "net2", "net1", "net3"} t.Fatalf("error adding route: %s", err)
gw := []string{"gw1", "gw2", "gw3", "gw3"}
rtr := []string{"rtr1", "rt2", "rt3", "rtr3"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
route.Network = net[i]
route.Gateway = gw[i]
route.Router = rtr[i]
route.Link = router.DefaultLink
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
} }
// return all routes rt, err := table.Query(route.Service)
routes, err := table.Query()
if err != nil { if err != nil {
t.Fatalf("error looking up routes: %s", err) t.Fatal("Expected a route got err", err)
} else if len(routes) == 0 {
t.Fatalf("error looking up routes: not found")
} }
// query routes particular network if len(rt) != 1 {
network := "net1" t.Fatalf("Expected one route got %d", len(rt))
routes, err = table.Query(router.QueryNetwork(network))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
} }
if len(routes) != 2 { if rt[0].Hash() != route.Hash() {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) t.Fatal("Mismatched routes received")
}
for _, route := range routes {
if route.Network != network {
t.Fatalf("incorrect route returned. Expected network: %s, found: %s", network, route.Network)
}
}
// query routes for particular gateway
gateway := "gw1"
routes, err = table.Query(router.QueryGateway(gateway))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
// query routes for particular router
rt := "rtr1"
routes, err = table.Query(router.QueryRouter(rt))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
}
// query particular gateway and network
query := []router.QueryOption{
router.QueryGateway(gateway),
router.QueryNetwork(network),
router.QueryRouter(rt),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
if routes[0].Network != network {
t.Fatalf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
}
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
}
// non-existen route query
routes, err = table.Query(router.QueryService("foobar"))
if err != router.ErrRouteNotFound {
t.Fatalf("error looking up routes. Expected: %s, found: %s", router.ErrRouteNotFound, err)
}
if len(routes) != 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
}
// query NO routes
query = []router.QueryOption{
router.QueryGateway(gateway),
router.QueryNetwork(network),
router.QueryLink("network"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) > 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
}
// insert local routes to query
for i := 0; i < 2; i++ {
route.Link = "foobar"
route.Address = fmt.Sprintf("local.route.address-%d", i)
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
// query local routes
query = []router.QueryOption{
router.QueryGateway("*"),
router.QueryNetwork("*"),
router.QueryLink("foobar"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
}
// add two different routes for svcX with different metric
for i := 0; i < 2; i++ {
route.Service = "svcX"
route.Address = fmt.Sprintf("svcX.route.address-%d", i)
route.Metric = int64(100 + i)
route.Link = router.DefaultLink
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
query = []router.QueryOption{
router.QueryService("svcX"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
} }
} }
func TestFallback(t *testing.T) {
r := &rtr{
options: router.DefaultOptions(),
}
route := router.Route{
Service: "go.micro.service.foo",
Router: r.options.Id,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
}
r.table = newTable(func(s string) ([]router.Route, error) {
return []router.Route{route}, nil
})
r.start()
rts, err := r.Lookup(router.QueryService("go.micro.service.foo"))
if err != nil {
t.Fatalf("error looking up service %s", err)
}
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
}
// deleting from the table but the next query should invoke the fallback that we passed during new table creation
if err := r.table.Delete(route); err != nil {
t.Fatalf("error deleting route %s", err)
}
rts, err = r.Lookup(router.QueryService("go.micro.service.foo"))
if err != nil {
t.Fatalf("error looking up service %s", err)
}
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
}
}
func TestFallbackError(t *testing.T) {
r := &rtr{
options: router.DefaultOptions(),
}
r.table = newTable(func(s string) ([]router.Route, error) {
return nil, fmt.Errorf("ERROR")
})
r.start()
_, err := r.Lookup(router.QueryService("go.micro.service.foo"))
if err == nil {
t.Fatalf("expected error looking up service but none returned")
}
}

View File

@ -23,7 +23,7 @@ type Router interface {
// The routing table // The routing table
Table() Table Table() Table
// Lookup queries routes in the routing table // Lookup queries routes in the routing table
Lookup(...QueryOption) ([]Route, error) Lookup(service string, opts ...LookupOption) ([]Route, error)
// Watch returns a watcher which tracks updates to the routing table // Watch returns a watcher which tracks updates to the routing table
Watch(opts ...WatchOption) (Watcher, error) Watch(opts ...WatchOption) (Watcher, error)
// Close the router // Close the router
@ -43,7 +43,7 @@ type Table interface {
// List all routes in the table // List all routes in the table
List() ([]Route, error) List() ([]Route, error)
// Query routes in the routing table // Query routes in the routing table
Query(...QueryOption) ([]Route, error) Query(service string) ([]Route, error)
} }
// Option used by the router // Option used by the router

View File

@ -10,12 +10,11 @@ func NewRouter(opts ...router.Option) router.Router {
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
return &static{options, new(table)} return &static{options}
} }
type static struct { type static struct {
options router.Options options router.Options
table router.Table
} }
func (s *static) Init(opts ...router.Option) error { func (s *static) Init(opts ...router.Option) error {
@ -33,8 +32,18 @@ func (s *static) Table() router.Table {
return nil return nil
} }
func (s *static) Lookup(opts ...router.QueryOption) ([]router.Route, error) { func (s *static) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
return s.table.Query(opts...) options := router.NewLookup(opts...)
return []router.Route{
router.Route{
Address: service,
Service: options.Address,
Gateway: options.Gateway,
Network: options.Network,
Router: options.Router,
},
}, nil
} }
func (s *static) Watch(opts ...router.WatchOption) (router.Watcher, error) { func (s *static) Watch(opts ...router.WatchOption) (router.Watcher, error) {
@ -48,35 +57,3 @@ func (s *static) Close() error {
func (s *static) String() string { func (s *static) String() string {
return "static" return "static"
} }
type table struct{}
func (t *table) Create(router.Route) error {
return nil
}
func (t *table) Delete(router.Route) error {
return nil
}
func (t *table) Update(router.Route) error {
return nil
}
func (t *table) List() ([]router.Route, error) {
return nil, nil
}
func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) {
options := router.NewQuery(opts...)
return []router.Route{
router.Route{
Address: options.Service,
Service: options.Address,
Gateway: options.Gateway,
Network: options.Network,
Router: options.Router,
},
}, nil
}

View File

@ -4,9 +4,9 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/server" "github.com/micro/go-micro/v3/server"
"github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/transport" "github.com/micro/go-micro/v3/transport"
) )
@ -31,10 +31,10 @@ type subscriber struct {
} }
func newMessage(msg transport.Message) *broker.Message { func newMessage(msg transport.Message) *broker.Message {
return &broker.Message{ return &broker.Message{
Header: msg.Header, Header: msg.Header,
Body: msg.Body, Body: msg.Body,
} }
} }
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"net/http" "net/http"
"github.com/micro/go-micro/v3/router"
"github.com/micro/go-micro/v3/selector" "github.com/micro/go-micro/v3/selector"
) )
@ -15,7 +14,7 @@ type roundTripper struct {
} }
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
routes, err := r.opts.Router.Lookup(router.QueryService(req.URL.Host)) routes, err := r.opts.Router.Lookup(req.URL.Host)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -10,7 +10,7 @@ type apiRouter struct {
router.Router router.Router
} }
func (r *apiRouter) Lookup(...router.QueryOption) ([]router.Route, error) { func (r *apiRouter) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
return r.routes, nil return r.routes, nil
} }