Etcd router bug fixing etcd path prefix matching name (#1899)
* add logging and don't get nodes where they exist in router * add more logging * Fix the etcd bug for name matching of keys and prefixes matching names
This commit is contained in:
parent
2b79910ad9
commit
aefd052dd7
@ -6,7 +6,6 @@ import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"path"
|
||||
"sort"
|
||||
@ -116,6 +115,32 @@ func configure(e *etcdRegistry, opts ...registry.Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// hasName checks if the key has the name we expect
|
||||
// the key is a path of /prefix/domain/name/id e.g /micro/registry/domain/service/uuid
|
||||
func hasName(key, prefix, name string) bool {
|
||||
// strip the prefix from keys
|
||||
key = strings.TrimPrefix(key, prefix)
|
||||
|
||||
// split the key so we remove domain
|
||||
parts := strings.Split(key, "/")
|
||||
|
||||
if len(parts) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(parts[0]) == 0 {
|
||||
parts = parts[1:]
|
||||
}
|
||||
|
||||
// we expect a domain and then name domain/service
|
||||
if len(parts) < 2 {
|
||||
return false
|
||||
}
|
||||
|
||||
// ensure the name matches what we expect
|
||||
return parts[1] == name
|
||||
}
|
||||
|
||||
func encode(s *registry.Service) string {
|
||||
b, _ := json.Marshal(s)
|
||||
return string(b)
|
||||
@ -392,18 +417,23 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r
|
||||
}
|
||||
|
||||
var results []*mvccpb.KeyValue
|
||||
|
||||
// TODO: refactorout wildcard, this is an incredibly expensive operation
|
||||
if options.Domain == registry.WildcardDomain {
|
||||
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// filter using a check for the service name
|
||||
keyPath := fmt.Sprintf("/%v/", serializeServiceName(name))
|
||||
// filter the results for the key we care about
|
||||
for _, kv := range rsp.Kvs {
|
||||
if strings.Contains(string(kv.Key), keyPath) {
|
||||
results = append(results, kv)
|
||||
// if the key does not contain the name then pass
|
||||
if !hasName(string(kv.Key), prefix, name) {
|
||||
continue
|
||||
}
|
||||
|
||||
// save the result if its what we expect
|
||||
results = append(results, kv)
|
||||
}
|
||||
} else {
|
||||
prefix := servicePath(options.Domain, name) + "/"
|
||||
@ -421,13 +451,13 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r
|
||||
versions := make(map[string]*registry.Service)
|
||||
|
||||
for _, n := range results {
|
||||
// key contains the domain, service name and version. hence, if a service name exists in two
|
||||
// seperate domains, it'll be returned twice (for wildcard queries), this is because although
|
||||
// the name is the same, the endpoints / metadata could differ
|
||||
key, _ := path.Split(string(n.Key))
|
||||
// only process the things we care about
|
||||
if !hasName(string(n.Key), prefix, name) {
|
||||
continue
|
||||
}
|
||||
|
||||
if sn := decode(n.Value); sn != nil {
|
||||
s, ok := versions[key]
|
||||
s, ok := versions[sn.Version]
|
||||
if !ok {
|
||||
s = ®istry.Service{
|
||||
Name: sn.Name,
|
||||
@ -437,16 +467,18 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r
|
||||
}
|
||||
versions[s.Version] = s
|
||||
}
|
||||
|
||||
s.Nodes = append(s.Nodes, sn.Nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
services := make([]*registry.Service, 0, len(versions))
|
||||
|
||||
for _, service := range versions {
|
||||
services = append(services, service)
|
||||
}
|
||||
|
||||
logger.Tracef("[etcd] registry get service %s returned %v", name, services)
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
||||
@ -487,14 +519,9 @@ func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se
|
||||
continue
|
||||
}
|
||||
|
||||
// key contains the domain, service name and version. hence, if a service name exists in two
|
||||
// seperate domains, it'll be returned twice (for wildcard queries), this is because although
|
||||
// the name is the same, the endpoints / metadata could differ
|
||||
key, _ := path.Split(string(n.Key))
|
||||
|
||||
v, ok := versions[key]
|
||||
v, ok := versions[sn.Version]
|
||||
if !ok {
|
||||
versions[key] = sn
|
||||
versions[sn.Version] = sn
|
||||
continue
|
||||
}
|
||||
|
||||
|
53
registry/etcd/etcd_test.go
Normal file
53
registry/etcd/etcd_test.go
Normal file
@ -0,0 +1,53 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// test whether the name matches
|
||||
func TestEtcdHasName(t *testing.T) {
|
||||
testCases := []struct {
|
||||
key string
|
||||
prefix string
|
||||
name string
|
||||
expect bool
|
||||
}{
|
||||
{
|
||||
"/micro/registry/micro/registry",
|
||||
"/micro/registry",
|
||||
"registry",
|
||||
true,
|
||||
},
|
||||
{
|
||||
"/micro/registry/micro/store",
|
||||
"/micro/registry",
|
||||
"registry",
|
||||
false,
|
||||
},
|
||||
{
|
||||
"/prefix/baz/*/registry",
|
||||
"/prefix/baz",
|
||||
"registry",
|
||||
true,
|
||||
},
|
||||
{
|
||||
"/prefix/baz/micro/registry",
|
||||
"/prefix/baz",
|
||||
"store",
|
||||
false,
|
||||
},
|
||||
{
|
||||
"/prefix/baz/micro/registry",
|
||||
"/prefix/baz",
|
||||
"registry",
|
||||
true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range testCases {
|
||||
v := hasName(c.key, c.prefix, c.name)
|
||||
if v != c.expect {
|
||||
t.Fatalf("Expected %t for %v got: %t", c.expect, c, v)
|
||||
}
|
||||
}
|
||||
}
|
@ -197,6 +197,7 @@ func (r *rtr) manageRegistryRoutes(reg registry.Registry, action string) error {
|
||||
|
||||
// fetchRoutes retrieves all the routes for a given service and creates them in the routing table
|
||||
func (r *rtr) fetchRoutes(service string) error {
|
||||
logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain)
|
||||
services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain))
|
||||
if err == registry.ErrNotFound {
|
||||
logger.Tracef("Failed to find route for %s", service)
|
||||
|
@ -245,21 +245,21 @@ func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) {
|
||||
}
|
||||
|
||||
// readAndFilter routes for this service under read lock.
|
||||
readAndFilter := func() ([]router.Route, bool) {
|
||||
readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
routes, ok := t.routes[opts.Service]
|
||||
routes, ok := t.routes[q.Service]
|
||||
if !ok || len(routes) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), true
|
||||
return findRoutes(routes, q.Address, q.Gateway, q.Network, q.Router, q.Strategy), true
|
||||
}
|
||||
|
||||
if opts.Service != "*" {
|
||||
// try and load services from the cache
|
||||
if routes, ok := readAndFilter(); ok {
|
||||
if routes, ok := readAndFilter(opts); ok {
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
@ -269,7 +269,7 @@ func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) {
|
||||
}
|
||||
|
||||
// try again
|
||||
if routes, ok := readAndFilter(); ok {
|
||||
if routes, ok := readAndFilter(opts); ok {
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user