use own fork

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-01-10 14:30:04 +03:00
parent cf3186c481
commit b9ccb43243
15 changed files with 851 additions and 78 deletions

View File

@@ -10,13 +10,12 @@ import (
"sync"
"time"
"github.com/micro/go-micro/v3/api"
"github.com/micro/go-micro/v3/api/router"
"github.com/micro/go-micro/v3/logger"
"github.com/micro/go-micro/v3/metadata"
"github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/registry/cache"
util "github.com/micro/go-micro/v3/util/router"
"github.com/unistack-org/micro/v3/api"
"github.com/unistack-org/micro/v3/api/router"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry"
util "github.com/unistack-org/micro/v3/util/router"
)
// endpoint struct, that holds compiled pcre
@@ -31,9 +30,6 @@ type registryRouter struct {
exit chan bool
opts router.Options
// registry cache
rc cache.Cache
sync.RWMutex
eps map[string]*api.Service
// compiled regexp for host and path
@@ -54,10 +50,10 @@ func (r *registryRouter) refresh() {
var attempts int
for {
services, err := r.opts.Registry.ListServices()
services, err := r.opts.Registry.ListServices(r.opts.Context)
if err != nil {
attempts++
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
if logger.V(logger.ErrorLevel) {
logger.Errorf("unable to list services: %v", err)
}
time.Sleep(time.Duration(attempts) * time.Second)
@@ -68,9 +64,9 @@ func (r *registryRouter) refresh() {
// for each service, get service and store endpoints
for _, s := range services {
service, err := r.rc.GetService(s.Name)
service, err := r.opts.Registry.GetService(r.opts.Context, s.Name)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
if logger.V(logger.ErrorLevel) {
logger.Errorf("unable to get service: %v", err)
}
continue
@@ -96,9 +92,9 @@ func (r *registryRouter) process(res *registry.Result) {
}
// get entry from cache
service, err := r.rc.GetService(res.Service.Name)
service, err := r.opts.Registry.GetService(r.opts.Context, res.Service.Name)
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
if logger.V(logger.ErrorLevel) {
logger.Errorf("unable to get %v service: %v", res.Service.Name, err)
}
return
@@ -133,7 +129,7 @@ func (r *registryRouter) store(services []*registry.Service) {
}
// if we got nothing skip
if err := api.Validate(end); err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("endpoint validation failed: %v", err)
}
continue
@@ -180,7 +176,7 @@ func (r *registryRouter) store(services []*registry.Service) {
}
hostreg, err := regexp.CompilePOSIX(h)
if err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("endpoint have invalid host regexp: %v", err)
}
continue
@@ -201,7 +197,7 @@ func (r *registryRouter) store(services []*registry.Service) {
rule, err := util.Parse(p)
if err != nil && !pcreok {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("endpoint have invalid path pattern: %v", err)
}
continue
@@ -212,7 +208,7 @@ func (r *registryRouter) store(services []*registry.Service) {
tpl := rule.Compile()
pathreg, err := util.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "")
if err != nil {
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
if logger.V(logger.TraceLevel) {
logger.Tracef("endpoint have invalid path pattern: %v", err)
}
continue
@@ -234,10 +230,10 @@ func (r *registryRouter) watch() {
}
// watch for changes
w, err := r.opts.Registry.Watch()
w, err := r.opts.Registry.Watch(r.opts.Context)
if err != nil {
attempts++
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
if logger.V(logger.ErrorLevel) {
logger.Errorf("error watching endpoints: %v", err)
}
time.Sleep(time.Duration(attempts) * time.Second)
@@ -262,7 +258,7 @@ func (r *registryRouter) watch() {
// process next event
res, err := w.Next()
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
if logger.V(logger.ErrorLevel) {
logger.Errorf("error getting next endpoint: %v", err)
}
close(ch)
@@ -283,7 +279,6 @@ func (r *registryRouter) Close() error {
return nil
default:
close(r.exit)
r.rc.Stop()
}
return nil
}
@@ -329,8 +324,8 @@ func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
if !mMatch {
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api method match %s", req.Method)
if logger.V(logger.TraceLevel) {
logger.Tracef("api method match %s", req.Method)
}
// 2. try host
@@ -352,21 +347,21 @@ func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
if !hMatch {
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api host match %s", req.URL.Host)
if logger.V(logger.TraceLevel) {
logger.Tracef("api host match %s", req.URL.Host)
}
// 3. try path via google.api path matching
for _, pathreg := range cep.pathregs {
matches, err := pathreg.Match(path, "")
if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api gpath not match %s != %v", path, pathreg)
if logger.V(logger.TraceLevel) {
logger.Tracef("api gpath not match %s != %v", path, pathreg)
}
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api gpath match %s = %v", path, pathreg)
if logger.V(logger.TraceLevel) {
logger.Tracef("api gpath match %s = %v", path, pathreg)
}
pMatch = true
ctx := req.Context()
@@ -386,13 +381,13 @@ func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) {
// 4. try path via pcre path matching
for _, pathreg := range cep.pcreregs {
if !pathreg.MatchString(req.URL.Path) {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api pcre path not match %s != %v", path, pathreg)
if logger.V(logger.TraceLevel) {
logger.Tracef("api pcre path not match %s != %v", path, pathreg)
}
continue
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("api pcre path match %s != %v", path, pathreg)
if logger.V(logger.TraceLevel) {
logger.Tracef("api pcre path match %s != %v", path, pathreg)
}
pMatch = true
break
@@ -437,7 +432,7 @@ func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
name := rp.Name
// get service
services, err := r.rc.GetService(name, registry.GetDomain(rp.Domain))
services, err := r.opts.Registry.GetService(r.opts.Context, name, registry.GetDomain(rp.Domain))
if err != nil {
return nil, err
}
@@ -481,12 +476,26 @@ func (r *registryRouter) Route(req *http.Request) (*api.Service, error) {
return nil, errors.New("unknown handler")
}
func newRouter(opts ...router.Option) *registryRouter {
func (r *registryRouter) Init(opts ...router.Option) error {
for _, o := range opts {
o(&r.opts)
}
if r.opts.Registry == nil {
return fmt.Errorf("missing Registry option")
}
return nil
}
func (r *registryRouter) String() string {
return "registry"
}
// NewRouter returns the default router
func NewRouter(opts ...router.Option) router.Router {
options := router.NewOptions(opts...)
r := &registryRouter{
exit: make(chan bool),
opts: options,
rc: cache.New(options.Registry),
eps: make(map[string]*api.Service),
ceps: make(map[string]*endpoint),
}
@@ -494,8 +503,3 @@ func newRouter(opts ...router.Option) *registryRouter {
go r.refresh()
return r
}
// NewRouter returns the default router
func NewRouter(opts ...router.Option) router.Router {
return newRouter(opts...)
}