488 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			488 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package registry
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/micro/go-micro/v3/logger"
 | |
| 	"github.com/micro/go-micro/v3/registry"
 | |
| 	"github.com/micro/go-micro/v3/router"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// RefreshInterval is the time at which we completely refresh the table
 | |
| 	RefreshInterval = time.Second * 120
 | |
| 	// PruneInterval is how often we prune the routing table
 | |
| 	PruneInterval = time.Second * 10
 | |
| )
 | |
| 
 | |
| // rtr implements router interface
 | |
| type rtr struct {
 | |
| 	sync.RWMutex
 | |
| 
 | |
| 	running  bool
 | |
| 	table    *table
 | |
| 	options  router.Options
 | |
| 	exit     chan bool
 | |
| 	initChan chan bool
 | |
| }
 | |
| 
 | |
| // NewRouter creates new router and returns it
 | |
| func NewRouter(opts ...router.Option) router.Router {
 | |
| 	// get default options
 | |
| 	options := router.DefaultOptions()
 | |
| 
 | |
| 	// apply requested options
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	// construct the router
 | |
| 	r := &rtr{
 | |
| 		options:  options,
 | |
| 		initChan: make(chan bool),
 | |
| 	}
 | |
| 
 | |
| 	// create the new table, passing the fetchRoute method in as a fallback if
 | |
| 	// the table doesn't contain the result for a query.
 | |
| 	r.table = newTable()
 | |
| 
 | |
| 	// start the router
 | |
| 	r.start()
 | |
| 	return r
 | |
| }
 | |
| 
 | |
| // Init initializes router with given options
 | |
| func (r *rtr) Init(opts ...router.Option) error {
 | |
| 	r.Lock()
 | |
| 	for _, o := range opts {
 | |
| 		o(&r.options)
 | |
| 	}
 | |
| 	r.Unlock()
 | |
| 
 | |
| 	// push a message to the init chan so the watchers
 | |
| 	// can reset in the case the registry was changed
 | |
| 	go func() {
 | |
| 		r.initChan <- true
 | |
| 	}()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Options returns router options
 | |
| func (r *rtr) Options() router.Options {
 | |
| 	r.RLock()
 | |
| 	defer r.RUnlock()
 | |
| 
 | |
| 	options := r.options
 | |
| 
 | |
| 	return options
 | |
| }
 | |
| 
 | |
| // Table returns routing table
 | |
| func (r *rtr) Table() router.Table {
 | |
| 	r.Lock()
 | |
| 	defer r.Unlock()
 | |
| 	return r.table
 | |
| }
 | |
| 
 | |
| func getDomain(srv *registry.Service) string {
 | |
| 	// check the service metadata for domain
 | |
| 	// TODO: domain as Domain field in registry?
 | |
| 	if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 {
 | |
| 		return srv.Metadata["domain"]
 | |
| 	} else if len(srv.Nodes) > 0 && srv.Nodes[0].Metadata != nil {
 | |
| 		return srv.Nodes[0].Metadata["domain"]
 | |
| 	}
 | |
| 
 | |
| 	// otherwise return wildcard
 | |
| 	// TODO: return GlobalDomain or PublicDomain
 | |
| 	return registry.DefaultDomain
 | |
| }
 | |
| 
 | |
| // manageRoute applies action on a given route
 | |
| func (r *rtr) manageRoute(route router.Route, action string) error {
 | |
| 	switch action {
 | |
| 	case "create":
 | |
| 		if err := r.table.Create(route); err != nil && err != router.ErrDuplicateRoute {
 | |
| 			return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
 | |
| 		}
 | |
| 	case "delete":
 | |
| 		if err := r.table.Delete(route); err != nil && err != router.ErrRouteNotFound {
 | |
| 			return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
 | |
| 		}
 | |
| 	case "update":
 | |
| 		if err := r.table.Update(route); err != nil {
 | |
| 			return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
 | |
| 		}
 | |
| 	default:
 | |
| 		return fmt.Errorf("failed to manage route for service %s: unknown action %s", route.Service, action)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // createRoutes turns a service into a list routes basically converting nodes to routes
 | |
| func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route {
 | |
| 	var routes []router.Route
 | |
| 
 | |
| 	for _, node := range service.Nodes {
 | |
| 		routes = append(routes, router.Route{
 | |
| 			Service:  service.Name,
 | |
| 			Address:  node.Address,
 | |
| 			Gateway:  "",
 | |
| 			Network:  network,
 | |
| 			Router:   r.options.Id,
 | |
| 			Link:     router.DefaultLink,
 | |
| 			Metric:   router.DefaultMetric,
 | |
| 			Metadata: node.Metadata,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return routes
 | |
| }
 | |
| 
 | |
| // manageServiceRoutes applies action to all routes of the service.
 | |
| // It returns error of the action fails with error.
 | |
| func (r *rtr) manageRoutes(service *registry.Service, action, network string) error {
 | |
| 	// action is the routing table action
 | |
| 	action = strings.ToLower(action)
 | |
| 
 | |
| 	// create a set of routes from the service
 | |
| 	routes := r.createRoutes(service, network)
 | |
| 
 | |
| 	// if its a delete action and there's no nodes
 | |
| 	// it means we need to wipe out all the routes
 | |
| 	// for that service
 | |
| 	if action == "delete" && len(routes) == 0 {
 | |
| 		// delete the service entirely
 | |
| 		r.table.deleteService(service.Name, network)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// create the routes in the table
 | |
| 	for _, route := range routes {
 | |
| 		logger.Tracef("Creating route %v domain: %v", route, network)
 | |
| 		if err := r.manageRoute(route, action); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // manageRegistryRoutes applies action to all routes of each service found in the registry.
 | |
| // It returns error if either the services failed to be listed or the routing table action fails.
 | |
| func (r *rtr) loadRoutes(reg registry.Registry) error {
 | |
| 	services, err := reg.ListServices(registry.ListDomain(registry.WildcardDomain))
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed listing services: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// add each service node as a separate route
 | |
| 	for _, service := range services {
 | |
| 		// get the services domain from metadata. Fallback to wildcard.
 | |
| 		domain := getDomain(service)
 | |
| 
 | |
| 		// create the routes
 | |
| 		routes := r.createRoutes(service, domain)
 | |
| 
 | |
| 		// if the routes exist save them
 | |
| 		if len(routes) > 0 {
 | |
| 			logger.Tracef("Creating routes for service %v domain: %v", service, domain)
 | |
| 			for _, rt := range routes {
 | |
| 				err := r.table.Create(rt)
 | |
| 
 | |
| 				// update the route to prevent it from expiring
 | |
| 				if err == router.ErrDuplicateRoute {
 | |
| 					err = r.table.Update(rt)
 | |
| 				}
 | |
| 
 | |
| 				if err != nil {
 | |
| 					logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
 | |
| 				}
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// otherwise get all the service info
 | |
| 
 | |
| 		// get the service to retrieve all its info
 | |
| 		srvs, err := reg.GetService(service.Name, registry.GetDomain(domain))
 | |
| 		if err != nil {
 | |
| 			logger.Tracef("Failed to get service %s domain: %s", service.Name, domain)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// manage the routes for all returned services
 | |
| 		for _, srv := range srvs {
 | |
| 			routes := r.createRoutes(srv, domain)
 | |
| 
 | |
| 			if len(routes) > 0 {
 | |
| 				logger.Tracef("Creating routes for service %v domain: %v", srv, domain)
 | |
| 				for _, rt := range routes {
 | |
| 					err := r.table.Create(rt)
 | |
| 
 | |
| 					// update the route to prevent it from expiring
 | |
| 					if err == router.ErrDuplicateRoute {
 | |
| 						err = r.table.Update(rt)
 | |
| 					}
 | |
| 
 | |
| 					if err != nil {
 | |
| 						logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close the router
 | |
| func (r *rtr) Close() error {
 | |
| 	r.Lock()
 | |
| 	defer r.Unlock()
 | |
| 
 | |
| 	select {
 | |
| 	case <-r.exit:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		if !r.running {
 | |
| 			return nil
 | |
| 		}
 | |
| 		close(r.exit)
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	r.running = false
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // lookup retrieves all the routes for a given service and creates them in the routing table
 | |
| func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
 | |
| 	q := router.NewLookup(opts...)
 | |
| 
 | |
| 	// if we find the routes filter and return them
 | |
| 	routes, err := r.table.Read(router.ReadService(service))
 | |
| 	if err == nil {
 | |
| 		routes = router.Filter(routes, q)
 | |
| 		if len(routes) == 0 {
 | |
| 			return nil, router.ErrRouteNotFound
 | |
| 		}
 | |
| 		return routes, nil
 | |
| 	}
 | |
| 
 | |
| 	// lookup the route
 | |
| 	logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
 | |
| 
 | |
| 	services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
 | |
| 	if err == registry.ErrNotFound {
 | |
| 		logger.Tracef("Failed to find route for %s", service)
 | |
| 		return nil, router.ErrRouteNotFound
 | |
| 	} else if err != nil {
 | |
| 		logger.Tracef("Failed to find route for %s: %v", service, err)
 | |
| 		return nil, fmt.Errorf("failed getting services: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	for _, srv := range services {
 | |
| 		domain := getDomain(srv)
 | |
| 		// TODO: should we continue to send the event indicating we created a route?
 | |
| 		// lookup is only called in the query path so probably not
 | |
| 		routes = append(routes, r.createRoutes(srv, domain)...)
 | |
| 	}
 | |
| 
 | |
| 	// if we're supposed to cache then save the routes
 | |
| 	if r.options.Cache {
 | |
| 		for _, route := range routes {
 | |
| 			r.table.Create(route)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	routes = router.Filter(routes, q)
 | |
| 	if len(routes) == 0 {
 | |
| 		return nil, router.ErrRouteNotFound
 | |
| 	}
 | |
| 	return routes, nil
 | |
| }
 | |
| 
 | |
| // watchRegistry watches registry and updates routing table based on the received events.
 | |
| // It returns error if either the registry watcher fails with error or if the routing table update fails.
 | |
| func (r *rtr) watchRegistry(w registry.Watcher) error {
 | |
| 	exit := make(chan bool)
 | |
| 
 | |
| 	defer func() {
 | |
| 		close(exit)
 | |
| 	}()
 | |
| 
 | |
| 	go func() {
 | |
| 		defer w.Stop()
 | |
| 
 | |
| 		select {
 | |
| 		case <-exit:
 | |
| 			return
 | |
| 		case <-r.initChan:
 | |
| 			return
 | |
| 		case <-r.exit:
 | |
| 			return
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		// get the next service
 | |
| 		res, err := w.Next()
 | |
| 		if err != nil {
 | |
| 			if err != registry.ErrWatcherStopped {
 | |
| 				return err
 | |
| 			}
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		// don't process nil entries
 | |
| 		if res.Service == nil {
 | |
| 			logger.Trace("Received a nil service")
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service)
 | |
| 
 | |
| 		// get the services domain from metadata. Fallback to wildcard.
 | |
| 		domain := getDomain(res.Service)
 | |
| 
 | |
| 		// create/update or delete the route
 | |
| 		if err := r.manageRoutes(res.Service, res.Action, domain); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // start the router. Should be called under lock.
 | |
| func (r *rtr) start() error {
 | |
| 	if r.running {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// add default gateway into routing table
 | |
| 	if r.options.Gateway != "" {
 | |
| 		// note, the only non-default value is the gateway
 | |
| 		route := router.Route{
 | |
| 			Service: "*",
 | |
| 			Address: "*",
 | |
| 			Gateway: r.options.Gateway,
 | |
| 			Network: "*",
 | |
| 			Router:  r.options.Id,
 | |
| 			Link:    router.DefaultLink,
 | |
| 			Metric:  router.DefaultMetric,
 | |
| 		}
 | |
| 		if err := r.table.Create(route); err != nil {
 | |
| 			return fmt.Errorf("failed adding default gateway route: %s", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// create error and exit channels
 | |
| 	r.exit = make(chan bool)
 | |
| 	r.running = true
 | |
| 
 | |
| 	// only cache if told to do so
 | |
| 	if !r.options.Cache {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// create a refresh notify channel
 | |
| 	refresh := make(chan bool, 1)
 | |
| 
 | |
| 	// fires the refresh for loading routes
 | |
| 	refreshRoutes := func() {
 | |
| 		select {
 | |
| 		case refresh <- true:
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// refresh all the routes in the event of a failure watching the registry
 | |
| 	go func() {
 | |
| 		var lastRefresh time.Time
 | |
| 
 | |
| 		// load a refresh
 | |
| 		refreshRoutes()
 | |
| 
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-r.exit:
 | |
| 				return
 | |
| 			case <-refresh:
 | |
| 				// don't refresh if we've done so in the past minute
 | |
| 				if !lastRefresh.IsZero() && time.Since(lastRefresh) < time.Minute {
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				// load new routes
 | |
| 				if err := r.loadRoutes(r.options.Registry); err != nil {
 | |
| 					logger.Debugf("failed refreshing registry routes: %s", err)
 | |
| 					// in this don't prune
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				// first time so nothing to prune
 | |
| 				if !lastRefresh.IsZero() {
 | |
| 					// prune any routes since last refresh since we've
 | |
| 					// updated basically everything we care about
 | |
| 					r.table.pruneRoutes(time.Since(lastRefresh))
 | |
| 				}
 | |
| 
 | |
| 				// update the refresh time
 | |
| 				lastRefresh = time.Now()
 | |
| 			case <-time.After(RefreshInterval):
 | |
| 				refreshRoutes()
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-r.exit:
 | |
| 				return
 | |
| 			default:
 | |
| 				logger.Tracef("Router starting registry watch")
 | |
| 				w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
 | |
| 				if err != nil {
 | |
| 					if logger.V(logger.DebugLevel, logger.DefaultLogger) {
 | |
| 						logger.Debugf("failed creating registry watcher: %v", err)
 | |
| 					}
 | |
| 					time.Sleep(time.Second)
 | |
| 					// in the event of an error reload routes
 | |
| 					refreshRoutes()
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				// watchRegistry calls stop when it's done
 | |
| 				if err := r.watchRegistry(w); err != nil {
 | |
| 					if logger.V(logger.DebugLevel, logger.DefaultLogger) {
 | |
| 						logger.Debugf("Error watching the registry: %v", err)
 | |
| 					}
 | |
| 					time.Sleep(time.Second)
 | |
| 					// in the event of an error reload routes
 | |
| 					refreshRoutes()
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Watch routes
 | |
| func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) {
 | |
| 	return r.table.Watch(opts...)
 | |
| }
 | |
| 
 | |
| // String prints debugging information about router
 | |
| func (r *rtr) String() string {
 | |
| 	return "registry"
 | |
| }
 |