split router implementations

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-08-28 12:19:47 +03:00
parent 2c136b005e
commit 9c695ac343
8 changed files with 0 additions and 1585 deletions

View File

@ -1,122 +0,0 @@
package dns
import (
"fmt"
"net"
"strconv"
"github.com/unistack-org/micro/v3/router"
)
// NewRouter returns an initialized dns router
func NewRouter(opts ...router.Option) router.Router {
options := router.DefaultOptions()
for _, o := range opts {
o(&options)
}
if len(options.Network) == 0 {
options.Network = "micro"
}
return &dns{options, &table{options}}
}
type dns struct {
options router.Options
table *table
}
func (d *dns) Init(opts ...router.Option) error {
for _, o := range opts {
o(&d.options)
}
d.table.options = d.options
return nil
}
func (d *dns) Options() router.Options {
return d.options
}
func (d *dns) Table() router.Table {
return d.table
}
func (d *dns) Lookup(opts ...router.QueryOption) ([]router.Route, error) {
return d.table.Query(opts...)
}
func (d *dns) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return nil, nil
}
func (d *dns) Close() error {
return nil
}
func (d *dns) String() string {
return "dns"
}
type table struct {
options router.Options
}
func (t *table) Create(router.Route) error {
return nil
}
func (t *table) Delete(router.Route) error {
return nil
}
func (t *table) Update(router.Route) error {
return nil
}
func (t *table) List() ([]router.Route, error) {
return nil, nil
}
func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) {
options := router.NewQuery(opts...)
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
host, port, err := net.SplitHostPort(options.Service)
if err == nil {
// lookup the service using A records
ips, err := net.LookupHost(host)
if err != nil {
return nil, err
}
p, _ := strconv.Atoi(port)
// convert the ip addresses to routes
result := make([]router.Route, len(ips))
for i, ip := range ips {
result[i] = router.Route{
Service: options.Service,
Address: fmt.Sprintf("%s:%d", ip, uint16(p)),
}
}
return result, nil
}
// we didn't get the port so we'll lookup the service using SRV records. If we can't lookup the
// service using the SRV record, we return the error.
_, nodes, err := net.LookupSRV(options.Service, "tcp", t.options.Network)
if err != nil {
return nil, err
}
// convert the nodes (net services) to routes
result := make([]router.Route, len(nodes))
for i, n := range nodes {
result[i] = router.Route{
Service: options.Service,
Address: fmt.Sprintf("%s:%d", n.Target, n.Port),
Network: t.options.Network,
}
}
return result, nil
}

View File

@ -1,117 +0,0 @@
// Package mdns is an mdns router
package mdns
import (
"fmt"
"net"
"strconv"
"time"
"github.com/unistack-org/micro/v3/router"
"github.com/unistack-org/micro/v3/util/mdns"
)
// NewRouter returns an initialized dns router
func NewRouter(opts ...router.Option) router.Router {
options := router.DefaultOptions()
for _, o := range opts {
o(&options)
}
if len(options.Network) == 0 {
options.Network = "micro"
}
return &mdnsRouter{options}
}
type mdnsRouter struct {
options router.Options
}
func (m *mdnsRouter) Init(opts ...router.Option) error {
for _, o := range opts {
o(&m.options)
}
return nil
}
func (m *mdnsRouter) Options() router.Options {
return m.options
}
func (m *mdnsRouter) Table() router.Table {
return nil
}
func (m *mdnsRouter) Lookup(opts ...router.QueryOption) ([]router.Route, error) {
options := router.NewQuery(opts...)
// check to see if we have the port provided in the service, e.g. go-micro-srv-foo:8000
service, port, err := net.SplitHostPort(options.Service)
if err != nil {
service = options.Service
}
// query for the host
entries := make(chan *mdns.ServiceEntry)
p := mdns.DefaultParams(service)
p.Timeout = time.Millisecond * 100
p.Entries = entries
// check if we're using our own network
if len(options.Network) > 0 {
p.Domain = options.Network
}
// do the query
if err := mdns.Query(p); err != nil {
return nil, err
}
var routes []router.Route
// compose the routes based on the entries
for e := range entries {
addr := e.Host
// prefer ipv4 addrs
if len(e.AddrV4) > 0 {
addr = e.AddrV4.String()
// else use ipv6
} else if len(e.AddrV6) > 0 {
addr = "[" + e.AddrV6.String() + "]"
} else if len(addr) == 0 {
continue
}
pt := 443
if e.Port > 0 {
pt = e.Port
}
// set the port
if len(port) > 0 {
pt, _ = strconv.Atoi(port)
}
routes = append(routes, router.Route{
Service: service,
Address: fmt.Sprintf("%s:%d", addr, pt),
Network: p.Domain,
})
}
return routes, nil
}
func (m *mdnsRouter) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return nil, nil
}
func (m *mdnsRouter) Close() error {
return nil
}
func (m *mdnsRouter) String() string {
return "mdns"
}

View File

@ -1,445 +0,0 @@
package registry
import (
"fmt"
"strings"
"sync"
"time"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/router"
)
var (
// RefreshInterval is the time at which we completely refresh the table
RefreshInterval = time.Second * 120
// PruneInterval is how often we prune the routing table
PruneInterval = time.Second * 10
)
// rtr implements router interface
type rtr struct {
sync.RWMutex
running bool
table *table
options router.Options
exit chan bool
initChan chan bool
}
// NewRouter creates new router and returns it
func NewRouter(opts ...router.Option) (router.Router, error) {
// get default options
options := router.DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
// construct the router
r := &rtr{
options: options,
initChan: make(chan bool),
}
if options.Registry == nil {
return nil, fmt.Errorf("registry not set")
}
// create the new table, passing the fetchRoute method in as a fallback if
// the table doesn't contain the result for a query.
r.table = newTable(r.lookup)
// start the router
r.start()
return r, nil
}
// Init initializes router with given options
func (r *rtr) Init(opts ...router.Option) error {
r.Lock()
for _, o := range opts {
o(&r.options)
}
r.Unlock()
// push a message to the init chan so the watchers
// can reset in the case the registry was changed
go func() {
r.initChan <- true
}()
return nil
}
// Options returns router options
func (r *rtr) Options() router.Options {
r.RLock()
defer r.RUnlock()
options := r.options
return options
}
// Table returns routing table
func (r *rtr) Table() router.Table {
r.Lock()
defer r.Unlock()
return r.table
}
func getDomain(srv *registry.Service) string {
// check the service metadata for domain
// TODO: domain as Domain field in registry?
if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 {
return srv.Metadata["domain"]
} else if len(srv.Nodes) > 0 && srv.Nodes[0].Metadata != nil {
return srv.Nodes[0].Metadata["domain"]
}
// otherwise return wildcard
// TODO: return GlobalDomain or PublicDomain
return registry.DefaultDomain
}
// manageRoute applies action on a given route
func (r *rtr) manageRoute(route router.Route, action string) error {
switch action {
case "create":
if err := r.table.Create(route); err != nil && err != router.ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
}
case "delete":
if err := r.table.Delete(route); err != nil && err != router.ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
}
case "update":
if err := r.table.Update(route); err != nil {
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
}
default:
return fmt.Errorf("failed to manage route for service %s: unknown action %s", route.Service, action)
}
return nil
}
// createRoutes turns a service into a list routes basically converting nodes to routes
func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route {
routes := make([]router.Route, 0, len(service.Nodes))
for _, node := range service.Nodes {
routes = append(routes, router.Route{
Service: service.Name,
Address: node.Address,
Gateway: "",
Network: network,
Router: r.options.Id,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
Metadata: node.Metadata,
})
}
return routes
}
// manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error.
func (r *rtr) manageRoutes(service *registry.Service, action, network string) error {
// action is the routing table action
action = strings.ToLower(action)
// create a set of routes from the service
routes := r.createRoutes(service, network)
// if its a delete action and there's no nodes
// it means we need to wipe out all the routes
// for that service
if action == "delete" && len(routes) == 0 {
// delete the service entirely
r.table.deleteService(service.Name, network)
return nil
}
// create the routes in the table
for _, route := range routes {
logger.Tracef("Creating route %v domain: %v", route, network)
if err := r.manageRoute(route, action); err != nil {
return err
}
}
return nil
}
// manageRegistryRoutes applies action to all routes of each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails.
func (r *rtr) loadRoutes(reg registry.Registry) error {
services, err := reg.ListServices(registry.ListDomain(registry.WildcardDomain))
if err != nil {
return fmt.Errorf("failed listing services: %v", err)
}
// add each service node as a separate route
for _, service := range services {
// get the services domain from metadata. Fallback to wildcard.
domain := getDomain(service)
// create the routes
routes := r.createRoutes(service, domain)
// if the routes exist save them
if len(routes) > 0 {
logger.Tracef("Creating routes for service %v domain: %v", service, domain)
for _, rt := range routes {
err := r.table.Create(rt)
// update the route to prevent it from expiring
if err == router.ErrDuplicateRoute {
err = r.table.Update(rt)
}
if err != nil {
logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
}
}
continue
}
// otherwise get all the service info
// get the service to retrieve all its info
srvs, err := reg.GetService(service.Name, registry.GetDomain(domain))
if err != nil {
logger.Tracef("Failed to get service %s domain: %s", service.Name, domain)
continue
}
// manage the routes for all returned services
for _, srv := range srvs {
routes := r.createRoutes(srv, domain)
if len(routes) > 0 {
logger.Tracef("Creating routes for service %v domain: %v", srv, domain)
for _, rt := range routes {
err := r.table.Create(rt)
// update the route to prevent it from expiring
if err == router.ErrDuplicateRoute {
err = r.table.Update(rt)
}
if err != nil {
logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
}
}
}
}
}
return nil
}
// lookup retrieves all the routes for a given service and creates them in the routing table
func (r *rtr) lookup(service string) ([]router.Route, error) {
logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
if err == registry.ErrNotFound {
logger.Tracef("Failed to find route for %s", service)
return nil, router.ErrRouteNotFound
} else if err != nil {
logger.Tracef("Failed to find route for %s: %v", service, err)
return nil, fmt.Errorf("failed getting services: %v", err)
}
var routes []router.Route
for _, srv := range services {
domain := getDomain(srv)
// TODO: should we continue to send the event indicating we created a route?
// lookup is only called in the query path so probably not
routes = append(routes, r.createRoutes(srv, domain)...)
}
return routes, nil
}
// watchRegistry watches registry and updates routing table based on the received events.
// It returns error if either the registry watcher fails with error or if the routing table update fails.
func (r *rtr) watchRegistry(w registry.Watcher) error {
exit := make(chan bool)
defer func() {
close(exit)
}()
go func() {
defer w.Stop()
select {
case <-exit:
return
case <-r.initChan:
return
case <-r.exit:
return
}
}()
for {
// get the next service
res, err := w.Next()
if err != nil {
if err != registry.ErrWatcherStopped {
return err
}
break
}
// don't process nil entries
if res.Service == nil {
logger.Trace("Received a nil service")
continue
}
logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service)
// get the services domain from metadata. Fallback to wildcard.
domain := getDomain(res.Service)
// create/update or delete the route
if err := r.manageRoutes(res.Service, res.Action, domain); err != nil {
return err
}
}
return nil
}
// start the router. Should be called under lock.
func (r *rtr) start() error {
if r.running {
return nil
}
if r.options.Precache {
// add all local service routes into the routing table
if err := r.loadRoutes(r.options.Registry); err != nil {
return fmt.Errorf("failed loading registry routes: %s", err)
}
}
// add default gateway into routing table
if r.options.Gateway != "" {
// note, the only non-default value is the gateway
route := router.Route{
Service: "*",
Address: "*",
Gateway: r.options.Gateway,
Network: "*",
Router: r.options.Id,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
return fmt.Errorf("failed adding default gateway route: %s", err)
}
}
// create error and exit channels
r.exit = make(chan bool)
// periodically refresh all the routes
go func() {
t1 := time.NewTicker(RefreshInterval)
defer t1.Stop()
t2 := time.NewTicker(PruneInterval)
defer t2.Stop()
for {
select {
case <-r.exit:
return
case <-t2.C:
r.table.pruneRoutes(RefreshInterval)
case <-t1.C:
if err := r.loadRoutes(r.options.Registry); err != nil {
logger.Debugf("failed refreshing registry routes: %s", err)
}
}
}
}()
go func() {
for {
select {
case <-r.exit:
return
default:
logger.Tracef("Router starting registry watch")
w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("failed creating registry watcher: %v", err)
}
time.Sleep(time.Second)
continue
}
// watchRegistry calls stop when it's done
if err := r.watchRegistry(w); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Error watching the registry: %v", err)
}
time.Sleep(time.Second)
}
}
}
}()
r.running = true
return nil
}
// Lookup routes in the routing table
func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) {
return r.Table().Query(q...)
}
// Watch routes
func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return r.table.Watch(opts...)
}
// Close the router
func (r *rtr) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.exit:
return nil
default:
if !r.running {
return nil
}
close(r.exit)
}
r.running = false
return nil
}
// String prints debugging information about router
func (r *rtr) String() string {
return "registry"
}

View File

@ -1,27 +0,0 @@
// +build ignore
package registry
import (
"os"
"testing"
"github.com/unistack-org/micro/v3/registry/memory"
"github.com/unistack-org/micro/v3/router"
)
func routerTestSetup() router.Router {
r := memory.NewRegistry()
return NewRouter(router.Registry(r))
}
func TestRouterClose(t *testing.T) {
r := routerTestSetup()
if err := r.Close(); err != nil {
t.Errorf("failed to stop router: %v", err)
}
if len(os.Getenv("INTEGRATION_TESTS")) == 0 {
t.Logf("TestRouterStartStop STOPPED")
}
}

View File

@ -1,385 +0,0 @@
package registry
import (
"sync"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/router"
)
// table is an in-memory routing table
type table struct {
sync.RWMutex
// lookup for a service
lookup func(string) ([]router.Route, error)
// routes stores service routes
routes map[string]map[uint64]*route
// watchers stores table watchers
watchers map[string]*tableWatcher
}
type route struct {
route router.Route
updated time.Time
}
// newtable creates a new routing table and returns it
func newTable(lookup func(string) ([]router.Route, error), opts ...router.Option) *table {
return &table{
lookup: lookup,
routes: make(map[string]map[uint64]*route),
watchers: make(map[string]*tableWatcher),
}
}
// pruneRoutes will prune routes older than the time specified
func (t *table) pruneRoutes(olderThan time.Duration) {
var routes []router.Route
t.Lock()
// search for all the routes
for _, routeList := range t.routes {
for _, r := range routeList {
// if any route is older than
if time.Since(r.updated).Seconds() > olderThan.Seconds() {
routes = append(routes, r.route)
}
}
}
t.Unlock()
// delete the routes we've found
for _, route := range routes {
t.Delete(route)
}
}
// deleteService removes the entire service
func (t *table) deleteService(service, network string) {
t.Lock()
defer t.Unlock()
routes, ok := t.routes[service]
if !ok {
return
}
// delete the routes for the service
for hash, rt := range routes {
// TODO: check if this causes a problem
// with * in the network if that is a thing
// or blank strings
if rt.route.Network != network {
continue
}
delete(routes, hash)
}
// delete the map for the service if its empty
if len(routes) == 0 {
delete(t.routes, service)
return
}
// save the routes
t.routes[service] = routes
}
// sendEvent sends events to all subscribed watchers
func (t *table) sendEvent(e *router.Event) {
t.RLock()
defer t.RUnlock()
if len(e.Id) == 0 {
e.Id = uuid.New().String()
}
for _, w := range t.watchers {
select {
case w.resChan <- e:
case <-w.done:
// don't block forever
case <-time.After(time.Second):
}
}
}
// Create creates new route in the routing table
func (t *table) Create(r router.Route) error {
service := r.Service
sum := r.Hash()
t.Lock()
defer t.Unlock()
// check if there are any routes in the table for the route destination
if _, ok := t.routes[service]; !ok {
t.routes[service] = make(map[uint64]*route)
}
// add new route to the table for the route destination
if _, ok := t.routes[service][sum]; ok {
return router.ErrDuplicateRoute
}
// create the route
t.routes[service][sum] = &route{r, time.Now()}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router emitting %s for route: %s", router.Create, r.Address)
}
// send a route created event
go t.sendEvent(&router.Event{Type: router.Create, Timestamp: time.Now(), Route: r})
return nil
}
// Delete deletes the route from the routing table
func (t *table) Delete(r router.Route) error {
service := r.Service
sum := r.Hash()
t.Lock()
defer t.Unlock()
if _, ok := t.routes[service]; !ok {
return router.ErrRouteNotFound
}
if _, ok := t.routes[service][sum]; !ok {
return router.ErrRouteNotFound
}
// delete the route from the service
delete(t.routes[service], sum)
// delete the whole map if there are no routes left
if len(t.routes[service]) == 0 {
delete(t.routes, service)
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router emitting %s for route: %s", router.Delete, r.Address)
}
go t.sendEvent(&router.Event{Type: router.Delete, Timestamp: time.Now(), Route: r})
return nil
}
// Update updates routing table with the new route
func (t *table) Update(r router.Route) error {
service := r.Service
sum := r.Hash()
t.Lock()
defer t.Unlock()
// check if the route destination has any routes in the table
if _, ok := t.routes[service]; !ok {
t.routes[service] = make(map[uint64]*route)
}
if _, ok := t.routes[service][sum]; !ok {
// update the route
t.routes[service][sum] = &route{r, time.Now()}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router emitting %s for route: %s", router.Update, r.Address)
}
go t.sendEvent(&router.Event{Type: router.Update, Timestamp: time.Now(), Route: r})
return nil
}
// just update the route, but dont emit Update event
t.routes[service][sum] = &route{r, time.Now()}
return nil
}
// List returns a list of all routes in the table
func (t *table) List() ([]router.Route, error) {
t.RLock()
defer t.RUnlock()
var routes []router.Route
for _, rmap := range t.routes {
for _, route := range rmap {
routes = append(routes, route.route)
}
}
return routes, nil
}
// isMatch checks if the route matches given query options
func isMatch(route router.Route, address, gateway, network, rtr, link string) bool {
// matches the values provided
match := func(a, b string) bool {
if a == "*" || b == "*" || a == b {
return true
}
return false
}
// a simple struct to hold our values
type compare struct {
a string
b string
}
// compare the following values
values := []compare{
{gateway, route.Gateway},
{network, route.Network},
{rtr, route.Router},
{address, route.Address},
{link, route.Link},
}
for _, v := range values {
// attempt to match each value
if !match(v.a, v.b) {
return false
}
}
return true
}
// filterRoutes finds all the routes for given network and router and returns them
func filterRoutes(routes map[uint64]*route, opts router.QueryOptions) []router.Route {
address := opts.Address
gateway := opts.Gateway
network := opts.Network
rtr := opts.Router
link := opts.Link
// routeMap stores the routes we're going to advertise
routeMap := make(map[string][]router.Route)
var routeCnt int
for _, rt := range routes {
// get the actual route
route := rt.route
if isMatch(route, address, gateway, network, rtr, link) {
// add matchihg route to the routeMap
routeKey := route.Service + "@" + route.Network
routeMap[routeKey] = append(routeMap[routeKey], route)
routeCnt++
}
}
results := make([]router.Route, 0, routeCnt)
for _, route := range routeMap {
results = append(results, route...)
}
return results
}
// Lookup queries routing table and returns all routes that match the lookup query
func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) {
// create new query options
opts := router.NewQuery(q...)
// create a cwslicelist of query results
results := make([]router.Route, 0, len(t.routes))
// readAndFilter routes for this service under read lock.
readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) {
t.RLock()
defer t.RUnlock()
routes, ok := t.routes[q.Service]
if !ok || len(routes) == 0 {
return nil, false
}
return filterRoutes(routes, q), true
}
if opts.Service != "*" {
// try and load services from the cache
if routes, ok := readAndFilter(opts); ok {
return routes, nil
}
// lookup the route and try again
// TODO: move this logic out of the hot path
// being hammered on queries will require multiple lookups
routes, err := t.lookup(opts.Service)
if err != nil {
return nil, err
}
// cache the routes
for _, rt := range routes {
t.Create(rt)
}
// try again
if routes, ok := readAndFilter(opts); ok {
return routes, nil
}
return nil, router.ErrRouteNotFound
}
// search through all destinations
t.RLock()
for _, routes := range t.routes {
// filter the routes
found := filterRoutes(routes, opts)
// ensure we don't append zero length routes
if len(found) == 0 {
continue
}
results = append(results, found...)
}
t.RUnlock()
return results, nil
}
// Watch returns routing table entry watcher
func (t *table) Watch(opts ...router.WatchOption) (router.Watcher, error) {
// by default watch everything
wopts := router.WatchOptions{
Service: "*",
}
for _, o := range opts {
o(&wopts)
}
w := &tableWatcher{
id: uuid.New().String(),
opts: wopts,
resChan: make(chan *router.Event, 10),
done: make(chan struct{}),
}
// when the watcher is stopped delete it
go func() {
<-w.done
t.Lock()
delete(t.watchers, w.id)
t.Unlock()
}()
// save the watcher
t.Lock()
t.watchers[w.id] = w
t.Unlock()
return w, nil
}

View File

@ -1,355 +0,0 @@
// +build ignore
package registry
import (
"fmt"
"testing"
"github.com/unistack-org/micro/v3/router"
)
func testSetup(t *testing.T) (*table, router.Route) {
r, err := NewRouter()
if err != nil {
t.Fatal(err)
}
routr := r.(*rtr)
table := newTable(routr.lookup)
route := router.Route{
Service: "dest.svc",
Address: "dest.addr",
Gateway: "dest.gw",
Network: "dest.network",
Router: "src.router",
Link: "det.link",
Metric: 10,
}
return table, route
}
func TestCreate(t *testing.T) {
table, route := testSetup(t)
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
// adds new route for the original destination
route.Gateway = "dest.gw2"
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
// adding the same route under Insert policy must error
if err := table.Create(route); err != router.ErrDuplicateRoute {
t.Fatalf("error adding route. Expected error: %s, found: %s", router.ErrDuplicateRoute, err)
}
}
func TestDelete(t *testing.T) {
table, route := testSetup(t)
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
// should fail to delete non-existent route
prevSvc := route.Service
route.Service = "randDest"
if err := table.Delete(route); err != router.ErrRouteNotFound {
t.Fatalf("error deleting route. Expected: %s, found: %s", router.ErrRouteNotFound, err)
}
// we should be able to delete the existing route
route.Service = prevSvc
if err := table.Delete(route); err != nil {
t.Fatalf("error deleting route: %s", err)
}
}
func TestUpdate(t *testing.T) {
table, route := testSetup(t)
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
// change the metric of the original route
route.Metric = 200
if err := table.Update(route); err != nil {
t.Fatalf("error updating route: %s", err)
}
// this should add a new route
route.Service = "rand.dest"
if err := table.Update(route); err != nil {
t.Fatalf("error updating route: %s", err)
}
}
func TestList(t *testing.T) {
table, route := testSetup(t)
svc := []string{"one.svc", "two.svc", "three.svc"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
routes, err := table.List()
if err != nil {
t.Fatalf("error listing routes: %s", err)
}
if len(routes) != len(svc) {
t.Fatalf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes))
}
}
func TestQuery(t *testing.T) {
table, route := testSetup(t)
svc := []string{"svc1", "svc2", "svc3", "svc1"}
net := []string{"net1", "net2", "net1", "net3"}
gw := []string{"gw1", "gw2", "gw3", "gw3"}
rtr := []string{"rtr1", "rt2", "rt3", "rtr3"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
route.Network = net[i]
route.Gateway = gw[i]
route.Router = rtr[i]
route.Link = router.DefaultLink
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
// return all routes
routes, err := table.Query()
if err != nil {
t.Fatalf("error looking up routes: %s", err)
} else if len(routes) == 0 {
t.Fatalf("error looking up routes: not found")
}
// query routes particular network
network := "net1"
routes, err = table.Query(router.QueryNetwork(network))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
}
for _, route := range routes {
if route.Network != network {
t.Fatalf("incorrect route returned. Expected network: %s, found: %s", network, route.Network)
}
}
// query routes for particular gateway
gateway := "gw1"
routes, err = table.Query(router.QueryGateway(gateway))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
// query routes for particular router
rt := "rtr1"
routes, err = table.Query(router.QueryRouter(rt))
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
}
// query particular gateway and network
query := []router.QueryOption{
router.QueryGateway(gateway),
router.QueryNetwork(network),
router.QueryRouter(rt),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Gateway != gateway {
t.Fatalf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
if routes[0].Network != network {
t.Fatalf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
}
if routes[0].Router != rt {
t.Fatalf("incorrect route returned. Expected router: %s, found: %s", rt, routes[0].Router)
}
// non-existen route query
routes, err = table.Query(router.QueryService("foobar"))
if err != router.ErrRouteNotFound {
t.Fatalf("error looking up routes. Expected: %s, found: %s", router.ErrRouteNotFound, err)
}
if len(routes) != 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
}
// query NO routes
query = []router.QueryOption{
router.QueryGateway(gateway),
router.QueryNetwork(network),
router.QueryLink("network"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) > 0 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes))
}
// insert local routes to query
for i := 0; i < 2; i++ {
route.Link = "foobar"
route.Address = fmt.Sprintf("local.route.address-%d", i)
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
// query local routes
query = []router.QueryOption{
router.QueryGateway("*"),
router.QueryNetwork("*"),
router.QueryLink("foobar"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
}
// add two different routes for svcX with different metric
for i := 0; i < 2; i++ {
route.Service = "svcX"
route.Address = fmt.Sprintf("svcX.route.address-%d", i)
route.Metric = int64(100 + i)
route.Link = router.DefaultLink
if err := table.Create(route); err != nil {
t.Fatalf("error adding route: %s", err)
}
}
query = []router.QueryOption{
router.QueryService("svcX"),
}
routes, err = table.Query(query...)
if err != nil {
t.Fatalf("error looking up routes: %s", err)
}
if len(routes) != 2 {
t.Fatalf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
}
func TestFallback(t *testing.T) {
r := &rtr{
options: router.DefaultOptions(),
}
route := router.Route{
Service: "go.micro.service.foo",
Router: r.options.Id,
Link: router.DefaultLink,
Metric: router.DefaultLocalMetric,
}
r.table = newTable(func(s string) ([]router.Route, error) {
return []router.Route{route}, nil
})
r.start()
rts, err := r.Lookup(router.QueryService("go.micro.service.foo"))
if err != nil {
t.Fatalf("error looking up service %s", err)
}
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
}
// deleting from the table but the next query should invoke the fallback that we passed during new table creation
if err := r.table.Delete(route); err != nil {
t.Fatalf("error deleting route %s", err)
}
rts, err = r.Lookup(router.QueryService("go.micro.service.foo"))
if err != nil {
t.Fatalf("error looking up service %s", err)
}
if len(rts) != 1 {
t.Fatalf("incorrect number of routes returned %d", len(rts))
}
}
func TestFallbackError(t *testing.T) {
r := &rtr{
options: router.DefaultOptions(),
}
r.table = newTable(func(s string) ([]router.Route, error) {
return nil, fmt.Errorf("ERROR")
})
r.start()
_, err := r.Lookup(router.QueryService("go.micro.service.foo"))
if err == nil {
t.Fatalf("expected error looking up service but none returned")
}
}

View File

@ -1,52 +0,0 @@
package registry
import (
"sync"
"github.com/unistack-org/micro/v3/router"
)
// tableWatcher implements routing table Watcher
type tableWatcher struct {
sync.RWMutex
id string
opts router.WatchOptions
resChan chan *router.Event
done chan struct{}
}
// Next returns the next noticed action taken on table
// TODO: right now we only allow to watch particular service
func (w *tableWatcher) Next() (*router.Event, error) {
for {
select {
case res := <-w.resChan:
switch w.opts.Service {
case res.Route.Service, "*":
return res, nil
default:
continue
}
case <-w.done:
return nil, router.ErrWatcherStopped
}
}
}
// Chan returns watcher events channel
func (w *tableWatcher) Chan() (<-chan *router.Event, error) {
return w.resChan, nil
}
// Stop stops routing table watcher
func (w *tableWatcher) Stop() {
w.Lock()
defer w.Unlock()
select {
case <-w.done:
return
default:
close(w.done)
}
}

View File

@ -1,82 +0,0 @@
package static
import (
"github.com/unistack-org/micro/v3/router"
)
// NewRouter returns an initialized static router
func NewRouter(opts ...router.Option) router.Router {
options := router.DefaultOptions()
for _, o := range opts {
o(&options)
}
return &static{options, new(table)}
}
type static struct {
options router.Options
table router.Table
}
func (s *static) Init(opts ...router.Option) error {
for _, o := range opts {
o(&s.options)
}
return nil
}
func (s *static) Options() router.Options {
return s.options
}
func (s *static) Table() router.Table {
return nil
}
func (s *static) Lookup(opts ...router.QueryOption) ([]router.Route, error) {
return s.table.Query(opts...)
}
func (s *static) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return nil, nil
}
func (s *static) Close() error {
return nil
}
func (s *static) String() string {
return "static"
}
type table struct{}
func (t *table) Create(router.Route) error {
return nil
}
func (t *table) Delete(router.Route) error {
return nil
}
func (t *table) Update(router.Route) error {
return nil
}
func (t *table) List() ([]router.Route, error) {
return nil, nil
}
func (t *table) Query(opts ...router.QueryOption) ([]router.Route, error) {
options := router.NewQuery(opts...)
return []router.Route{
{
Address: options.Service,
Service: options.Address,
Gateway: options.Gateway,
Network: options.Network,
Router: options.Router,
},
}, nil
}