update for latest micro

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-01-29 15:08:17 +03:00
parent 5e4ca41774
commit 7e3e1152a5
7 changed files with 64 additions and 360 deletions

View File

@@ -1,4 +1,4 @@
package registry
package register
import (
"fmt"
@@ -6,7 +6,7 @@ import (
"time"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/router"
)
@@ -55,12 +55,12 @@ func (r *rtr) Init(opts ...router.Option) error {
}
r.Unlock()
if r.opts.Registry == nil {
return fmt.Errorf("registry not set")
if r.opts.Register == nil {
return fmt.Errorf("register not set")
}
// push a message to the init chan so the watchers
// can reset in the case the registry was changed
// can reset in the case the register was changed
go func() {
r.initChan <- true
}()
@@ -85,9 +85,9 @@ func (r *rtr) Table() router.Table {
return r.table
}
func getDomain(srv *registry.Service) string {
func getDomain(srv *register.Service) string {
// check the service metadata for domain
// TODO: domain as Domain field in registry?
// TODO: domain as Domain field in register?
if srv.Metadata != nil && len(srv.Metadata["domain"]) > 0 {
return srv.Metadata["domain"]
} else if len(srv.Nodes) > 0 && srv.Nodes[0].Metadata != nil {
@@ -96,7 +96,7 @@ func getDomain(srv *registry.Service) string {
// otherwise return wildcard
// TODO: return GlobalDomain or PublicDomain
return registry.DefaultDomain
return register.DefaultDomain
}
// manageRoute applies action on a given route
@@ -122,7 +122,7 @@ func (r *rtr) manageRoute(route router.Route, action string) error {
}
// createRoutes turns a service into a list routes basically converting nodes to routes
func (r *rtr) createRoutes(service *registry.Service, network string) []router.Route {
func (r *rtr) createRoutes(service *register.Service, network string) []router.Route {
routes := make([]router.Route, 0, len(service.Nodes))
for _, node := range service.Nodes {
@@ -143,7 +143,7 @@ func (r *rtr) createRoutes(service *registry.Service, network string) []router.R
// manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error.
func (r *rtr) manageRoutes(service *registry.Service, action, network string) error {
func (r *rtr) manageRoutes(service *register.Service, action, network string) error {
// create a set of routes from the service
routes := r.createRoutes(service, network)
@@ -169,10 +169,10 @@ func (r *rtr) manageRoutes(service *registry.Service, action, network string) er
return nil
}
// manageRegistryRoutes applies action to all routes of each service found in the registry.
// manageRegisterRoutes applies action to all routes of each service found in the register.
// It returns error if either the services failed to be listed or the routing table action fails.
func (r *rtr) loadRoutes(reg registry.Registry) error {
services, err := reg.ListServices(r.opts.Context, registry.ListDomain(registry.WildcardDomain))
func (r *rtr) loadRoutes(reg register.Register) error {
services, err := reg.ListServices(r.opts.Context, register.ListDomain(register.WildcardDomain))
if err != nil {
return fmt.Errorf("failed listing services: %v", err)
}
@@ -210,7 +210,7 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
// otherwise get all the service info
// get the service to retrieve all its info
srvs, err := reg.GetService(r.opts.Context, service.Name, registry.GetDomain(domain))
srvs, err := reg.LookupService(r.opts.Context, service.Name, register.LookupDomain(domain))
if err != nil {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to get service %s domain: %s", service.Name, domain)
@@ -250,11 +250,11 @@ func (r *rtr) loadRoutes(reg registry.Registry) error {
// lookup retrieves all the routes for a given service and creates them in the routing table
func (r *rtr) lookup(service string) ([]router.Route, error) {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Fetching route for %s domain: %v", service, registry.WildcardDomain)
r.opts.Logger.Tracef(r.opts.Context, "Fetching route for %s domain: %v", service, register.WildcardDomain)
}
services, err := r.opts.Registry.GetService(r.opts.Context, service, registry.GetDomain(registry.WildcardDomain))
if err == registry.ErrNotFound {
services, err := r.opts.Register.LookupService(r.opts.Context, service, register.LookupDomain(register.WildcardDomain))
if err == register.ErrNotFound {
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Failed to find route for %s", service)
}
@@ -278,9 +278,9 @@ func (r *rtr) lookup(service string) ([]router.Route, error) {
return routes, nil
}
// watchRegistry watches registry and updates routing table based on the received events.
// It returns error if either the registry watcher fails with error or if the routing table update fails.
func (r *rtr) watchRegistry(w registry.Watcher) error {
// watchRegister watches register and updates routing table based on the received events.
// It returns error if either the register watcher fails with error or if the routing table update fails.
func (r *rtr) watchRegister(w register.Watcher) error {
exit := make(chan bool)
defer func() {
@@ -304,7 +304,7 @@ func (r *rtr) watchRegistry(w registry.Watcher) error {
// get the next service
res, err := w.Next()
if err != nil {
if err != registry.ErrWatcherStopped {
if err != register.ErrWatcherStopped {
return err
}
break
@@ -342,8 +342,8 @@ func (r *rtr) start() error {
if r.opts.Precache {
// add all local service routes into the routing table
if err := r.loadRoutes(r.opts.Registry); err != nil {
return fmt.Errorf("failed loading registry routes: %s", err)
if err := r.loadRoutes(r.opts.Register); err != nil {
return fmt.Errorf("failed loading register routes: %s", err)
}
}
@@ -382,9 +382,9 @@ func (r *rtr) start() error {
case <-t2.C:
r.table.pruneRoutes(RefreshInterval)
case <-t1.C:
if err := r.loadRoutes(r.opts.Registry); err != nil {
if err := r.loadRoutes(r.opts.Register); err != nil {
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debugf(r.opts.Context, "failed refreshing registry routes: %s", err)
r.opts.Logger.Debugf(r.opts.Context, "failed refreshing register routes: %s", err)
}
}
}
@@ -398,21 +398,21 @@ func (r *rtr) start() error {
return
default:
if r.opts.Logger.V(logger.TraceLevel) {
r.opts.Logger.Tracef(r.opts.Context, "Router starting registry watch")
r.opts.Logger.Tracef(r.opts.Context, "Router starting register watch")
}
w, err := r.opts.Registry.Watch(r.opts.Context, registry.WatchDomain(registry.WildcardDomain))
w, err := r.opts.Register.Watch(r.opts.Context, register.WatchDomain(register.WildcardDomain))
if err != nil {
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debug(r.opts.Context, "failed creating registry watcher: %v", err)
r.opts.Logger.Debug(r.opts.Context, "failed creating register watcher: %v", err)
}
time.Sleep(time.Second)
continue
}
// watchRegistry calls stop when it's done
if err := r.watchRegistry(w); err != nil {
// watchRegister calls stop when it's done
if err := r.watchRegister(w); err != nil {
if r.opts.Logger.V(logger.DebugLevel) {
r.opts.Logger.Debugf(r.opts.Context, "Error watching the registry: %v", err)
r.opts.Logger.Debugf(r.opts.Context, "Error watching the register: %v", err)
}
time.Sleep(time.Second)
}
@@ -457,5 +457,9 @@ func (r *rtr) Close() error {
// String prints debugging information about router
func (r *rtr) String() string {
return "registry"
return "register"
}
func (r *rtr) Name() string {
return r.opts.Name
}