use own fork

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-01-16 01:12:18 +03:00
parent 900e782c4b
commit 3251b6312a
15 changed files with 1109 additions and 193 deletions

View File

@@ -2,13 +2,12 @@ package registry
import (
"fmt"
"strings"
"sync"
"time"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/router"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/router"
)
var (
@@ -24,30 +23,24 @@ type rtr struct {
running bool
table *table
options router.Options
opts router.Options
exit chan bool
initChan chan bool
}
// NewRouter creates new router and returns it
func NewRouter(opts ...router.Option) router.Router {
// get default options
options := router.DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
options := router.NewOptions(opts...)
// construct the router
r := &rtr{
options: options,
opts: options,
initChan: make(chan bool),
}
// create the new table, passing the fetchRoute method in as a fallback if
// the table doesn't contain the result for a query.
r.table = newTable()
r.table = newTable(r.lookup)
// start the router
r.start()
@@ -58,10 +51,14 @@ func NewRouter(opts ...router.Option) router.Router {
func (r *rtr) Init(opts ...router.Option) error {
r.Lock()
for _, o := range opts {
o(&r.options)
o(&r.opts)
}
r.Unlock()
if r.opts.Registry == nil {
return fmt.Errorf("registry not set")
}
// push a message to the init chan so the watchers
// can reset in the case the registry was changed
go func() {
@@ -76,7 +73,7 @@ func (r *rtr) Options() router.Options {
r.RLock()
defer r.RUnlock()
options := r.options
options := r.opts
return options
}
@@ -126,7 +123,7 @@ func (r *rtr) manageRoute(route router.Route, action string) error {
// createRoutes turns a service into a list routes basically converting nodes to routes
func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route {
var routes []router.Route
routes := make([]router.Route, 0, len(service.Nodes))
for _, node := range service.Nodes {
routes = append(routes, router.Route{
@@ -134,9 +131,9 @@ func (r *rtr) createRoutes(service *registry.Service, network string) []router.R
Address: node.Address,
Gateway: "",
Network: network,
Router: r.options.Id,
Router: r.opts.Id,
Link: router.DefaultLink,
Metric: router.DefaultMetric,
Metric: router.DefaultLocalMetric,
Metadata: node.Metadata,
})
}
@@ -147,9 +144,6 @@ func (r *rtr) createRoutes(service *registry.Service, network string) []router.R
// manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error.
func (r *rtr) manageRoutes(service *registry.Service, action, network string) error {
// action is the routing table action
action = strings.ToLower(action)
// create a set of routes from the service
routes := r.createRoutes(service, network)
@@ -164,7 +158,9 @@ func (r *rtr) manageRoutes(service *registry.Service, action, network string) er
// create the routes in the table
for _, route := range routes {
logger.Tracef("Creating route %v domain: %v", route, network)
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Creating route %v domain: %v", route, network)
}
if err := r.manageRoute(route, action); err != nil {
return err
}
@@ -176,7 +172,7 @@ func (r *rtr) manageRoutes(service *registry.Service, action, network string) er
// manageRegistryRoutes applies action to all routes of each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails.
func (r *rtr) loadRoutes(reg registry.Registry) error {
services, err := reg.ListServices(registry.ListDomain(registry.WildcardDomain))
services, err := reg.ListServices(r.opts.Context, registry.ListDomain(registry.WildcardDomain))
if err != nil {
return fmt.Errorf("failed listing services: %v", err)
}
@@ -191,7 +187,9 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
// if the routes exist save them
if len(routes) > 0 {
logger.Tracef("Creating routes for service %v domain: %v", service, domain)
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Creating routes for service %v domain: %v", service, domain)
}
for _, rt := range routes {
err := r.table.Create(rt)
@@ -201,7 +199,9 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
}
if err != nil {
logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
if r.opts.Logger.V(logger.ErrorLevel) {
r.opts.Logger.Errorf(r.opts.Context, "Error creating route for service %v in domain %v: %v", service, domain, err)
}
}
}
continue
@@ -210,9 +210,11 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
// otherwise get all the service info
// get the service to retrieve all its info
srvs, err := reg.GetService(service.Name, registry.GetDomain(domain))
srvs, err := reg.GetService(r.opts.Context, service.Name, registry.GetDomain(domain))
if err != nil {
logger.Tracef("Failed to get service %s domain: %s", service.Name, domain)
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to get service %s domain: %s", service.Name, domain)
}
continue
}
@@ -221,7 +223,9 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
routes := r.createRoutes(srv, domain)
if len(routes) > 0 {
logger.Tracef("Creating routes for service %v domain: %v", srv, domain)
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Creating routes for service %v domain: %v", srv, domain)
}
for _, rt := range routes {
err := r.table.Create(rt)
@@ -231,7 +235,9 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
}
if err != nil {
logger.Errorf("Error creating route for service %v in domain %v: %v", service, domain, err)
if r.opts.Logger.V(logger.ErrorLevel) {
r.opts.Logger.Errorf(r.opts.Context, "Error creating route for service %v in domain %v: %v", service, domain, err)
}
}
}
}
@@ -241,52 +247,27 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
return nil
}
// Close the router
func (r *rtr) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.exit:
return nil
default:
if !r.running {
return nil
}
close(r.exit)
}
r.running = false
return nil
}
// lookup retrieves all the routes for a given service and creates them in the routing table
func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Route, error) {
q := router.NewLookup(opts...)
// if we find the routes filter and return them
routes, err := r.table.Read(router.ReadService(service))
if err == nil {
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
}
return routes, nil
func (r *rtr) lookup(service string) ([]router.Route, error) {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Fetching route for %s domain: %v", service, registry.WildcardDomain)
}
// lookup the route
logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
services, err := r.opts.Registry.GetService(r.opts.Context, service, registry.GetDomain(registry.WildcardDomain))
if err == registry.ErrNotFound {
logger.Tracef("Failed to find route for %s", service)
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to find route for %s", service)
}
return nil, router.ErrRouteNotFound
} else if err != nil {
logger.Tracef("Failed to find route for %s: %v", service, err)
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to find route for %s: %v", service, err)
}
return nil, fmt.Errorf("failed getting services: %v", err)
}
var routes []router.Route
for _, srv := range services {
domain := getDomain(srv)
// TODO: should we continue to send the event indicating we created a route?
@@ -294,17 +275,6 @@ func (r *rtr) Lookup(service string, opts ...router.LookupOption) ([]router.Rout
routes = append(routes, r.createRoutes(srv, domain)...)
}
// if we're supposed to cache then save the routes
if r.options.Cache {
for _, route := range routes {
r.table.Create(route)
}
}
routes = router.Filter(routes, q)
if len(routes) == 0 {
return nil, router.ErrRouteNotFound
}
return routes, nil
}
@@ -342,11 +312,15 @@ func (r *rtr) watchRegistry(w registry.Watcher) error {
// don't process nil entries
if res.Service == nil {
logger.Trace("Received a nil service")
if logger.V(logger.TraceLevel) {
logger.Trace(r.opts.Context, "Received a nil service")
}
continue
}
logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service)
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Router dealing with next route %s %+v\n", res.Action, res.Service)
}
// get the services domain from metadata. Fallback to wildcard.
domain := getDomain(res.Service)
@@ -366,17 +340,24 @@ func (r *rtr) start() error {
return nil
}
if r.opts.Precache {
// add all local service routes into the routing table
if err := r.loadRoutes(r.opts.Registry); err != nil {
return fmt.Errorf("failed loading registry routes: %s", err)
}
}
// add default gateway into routing table
if r.options.Gateway != "" {
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := router.Route{
Service: "*",
Address: "*",
Gateway: r.options.Gateway,
Gateway: r.opts.Gateway,
Network: "*",
Router: r.options.Id,
Router: r.opts.Id,
Link: router.DefaultLink,
Metric: router.DefaultMetric,
Metric: router.DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
return fmt.Errorf("failed adding default gateway route: %s", err)
@@ -385,59 +366,27 @@ func (r *rtr) start() error {
// create error and exit channels
r.exit = make(chan bool)
r.running = true
// only cache if told to do so
if !r.options.Cache {
return nil
}
// create a refresh notify channel
refresh := make(chan bool, 1)
// fires the refresh for loading routes
refreshRoutes := func() {
select {
case refresh <- true:
default:
}
}
// refresh all the routes in the event of a failure watching the registry
// periodically refresh all the routes
go func() {
var lastRefresh time.Time
t1 := time.NewTicker(RefreshInterval)
defer t1.Stop()
// load a refresh
refreshRoutes()
t2 := time.NewTicker(PruneInterval)
defer t2.Stop()
for {
select {
case <-r.exit:
return
case <-refresh:
// don't refresh if we've done so in the past minute
if !lastRefresh.IsZero() && time.Since(lastRefresh) < time.Minute {
continue
case <-t2.C:
r.table.pruneRoutes(RefreshInterval)
case <-t1.C:
if err := r.loadRoutes(r.opts.Registry); err != nil {
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debugf(r.opts.Context, "failed refreshing registry routes: %s", err)
}
}
// load new routes
if err := r.loadRoutes(r.options.Registry); err != nil {
logger.Debugf("failed refreshing registry routes: %s", err)
// in this don't prune
continue
}
// first time so nothing to prune
if !lastRefresh.IsZero() {
// prune any routes since last refresh since we've
// updated basically everything we care about
r.table.pruneRoutes(time.Since(lastRefresh))
}
// update the refresh time
lastRefresh = time.Now()
case <-time.After(RefreshInterval):
refreshRoutes()
}
}
}()
@@ -448,39 +397,64 @@ func (r *rtr) start() error {
case <-r.exit:
return
default:
logger.Tracef("Router starting registry watch")
w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Router starting registry watch")
}
w, err := r.opts.Registry.Watch(r.opts.Context, registry.WatchDomain(registry.WildcardDomain))
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("failed creating registry watcher: %v", err)
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debug(r.opts.Context, "failed creating registry watcher: %v", err)
}
time.Sleep(time.Second)
// in the event of an error reload routes
refreshRoutes()
continue
}
// watchRegistry calls stop when it's done
if err := r.watchRegistry(w); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Error watching the registry: %v", err)
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debugf(r.opts.Context, "Error watching the registry: %v", err)
}
time.Sleep(time.Second)
// in the event of an error reload routes
refreshRoutes()
}
}
}
}()
r.running = true
return nil
}
// Lookup routes in the routing table
func (r *rtr) Lookup(q ...router.QueryOption) ([]router.Route, error) {
return r.Table().Query(q...)
}
// Watch routes
func (r *rtr) Watch(opts ...router.WatchOption) (router.Watcher, error) {
return r.table.Watch(opts...)
}
// Close the router
func (r *rtr) Close() error {
r.Lock()
defer r.Unlock()
select {
case <-r.exit:
return nil
default:
if !r.running {
return nil
}
close(r.exit)
}
r.running = false
return nil
}
// String prints debugging information about router
func (r *rtr) String() string {
return "registry"