update for latest micro

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-01-29 14:56:39 +03:00
parent 637dcb5cc1
commit e8b9086eb8
3 changed files with 47 additions and 344 deletions

View File

@@ -1,5 +1,5 @@
// Package registry provides a dynamic api service router
package registry
// Package register provides a dynamic api service router
package register
import (
"errors"
@@ -14,7 +14,7 @@ import (
"github.com/unistack-org/micro/v3/api/router"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/register"
util "github.com/unistack-org/micro/v3/util/router"
)
@@ -26,7 +26,7 @@ type endpoint struct {
}
// router is the default router
type registryRouter struct {
type registerRouter struct {
exit chan bool
opts router.Options
@@ -36,7 +36,7 @@ type registryRouter struct {
ceps map[string]*endpoint
}
func (r *registryRouter) isClosed() bool {
func (r *registerRouter) isClosed() bool {
select {
case <-r.exit:
return true
@@ -46,11 +46,11 @@ func (r *registryRouter) isClosed() bool {
}
// refresh list of api services
func (r *registryRouter) refresh() {
func (r *registerRouter) refresh() {
var attempts int
for {
services, err := r.opts.Registry.ListServices(r.opts.Context)
services, err := r.opts.Register.ListServices(r.opts.Context)
if err != nil {
attempts++
if logger.V(logger.ErrorLevel) {
@@ -64,7 +64,7 @@ func (r *registryRouter) refresh() {
// for each service, get service and store endpoints
for _, s := range services {
service, err := r.opts.Registry.GetService(r.opts.Context, s.Name)
service, err := r.opts.Register.LookupService(r.opts.Context, s.Name)
if err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf(r.opts.Context, "unable to get service: %v", err)
@@ -75,7 +75,7 @@ func (r *registryRouter) refresh() {
}
// refresh list in 10 minutes... cruft
// use registry watching
// use register watching
select {
case <-time.After(time.Minute * 10):
case <-r.exit:
@@ -85,14 +85,14 @@ func (r *registryRouter) refresh() {
}
// process watch event
func (r *registryRouter) process(res *registry.Result) {
func (r *registerRouter) process(res *register.Result) {
// skip these things
if res == nil || res.Service == nil {
return
}
// get entry from cache
service, err := r.opts.Registry.GetService(r.opts.Context, res.Service.Name)
service, err := r.opts.Register.LookupService(r.opts.Context, res.Service.Name)
if err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf(r.opts.Context, "unable to get %v service: %v", res.Service.Name, err)
@@ -105,7 +105,7 @@ func (r *registryRouter) process(res *registry.Result) {
}
// store local endpoint cache
func (r *registryRouter) store(services []*registry.Service) {
func (r *registerRouter) store(services []*register.Service) {
// endpoints
eps := map[string]*api.Service{}
@@ -221,7 +221,7 @@ func (r *registryRouter) store(services []*registry.Service) {
}
// watch for endpoint changes
func (r *registryRouter) watch() {
func (r *registerRouter) watch() {
var attempts int
for {
@@ -230,7 +230,7 @@ func (r *registryRouter) watch() {
}
// watch for changes
w, err := r.opts.Registry.Watch(r.opts.Context)
w, err := r.opts.Register.Watch(r.opts.Context)
if err != nil {
attempts++
if logger.V(logger.ErrorLevel) {
@@ -269,11 +269,11 @@ func (r *registryRouter) watch() {
}
}
func (r *registryRouter) Options() router.Options {
func (r *registerRouter) Options() router.Options {
return r.opts
}
func (r *registryRouter) Close() error {
func (r *registerRouter) Close() error {
select {
case <-r.exit:
return nil
@@ -283,15 +283,15 @@ func (r *registryRouter) Close() error {
return nil
}
func (r *registryRouter) Register(ep *api.Endpoint) error {
func (r *registerRouter) Register(ep *api.Endpoint) error {
return nil
}
func (r *registryRouter) Deregister(ep *api.Endpoint) error {
func (r *registerRouter) Deregister(ep *api.Endpoint) error {
return nil
}
func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
func (r *registerRouter) Endpoint(req *http.Request) (*api.Service, error) {
if r.isClosed() {
return nil, errors.New("router closed")
}
@@ -407,7 +407,7 @@ func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
return nil, errors.New("not found")
}
func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
func (r *registerRouter) Route(req *http.Request) (*api.Service, error) {
if r.isClosed() {
return nil, errors.New("router closed")
}
@@ -432,7 +432,7 @@ func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
name := rp.Name
// get service
services, err := r.opts.Registry.GetService(r.opts.Context, name, registry.GetDomain(rp.Domain))
services, err := r.opts.Register.LookupService(r.opts.Context, name, register.GetDomain(rp.Domain))
if err != nil {
return nil, err
}
@@ -476,24 +476,24 @@ func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
return nil, errors.New("unknown handler")
}
func (r *registryRouter) Init(opts ...router.Option) error {
func (r *registerRouter) Init(opts ...router.Option) error {
for _, o := range opts {
o(&r.opts)
}
if r.opts.Registry == nil {
return fmt.Errorf("missing Registry option")
if r.opts.Register == nil {
return fmt.Errorf("missing Register option")
}
return nil
}
func (r *registryRouter) String() string {
return "registry"
func (r *registerRouter) String() string {
return "register"
}
// NewRouter returns the default router
func NewRouter(opts ...router.Option) router.Router {
options := router.NewOptions(opts...)
r := &registryRouter{
r := &registerRouter{
exit: make(chan bool),
opts: options,
eps: make(map[string]*api.Service),