From aefd052dd7ad8721793489b678e20cd365597956 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 6 Aug 2020 12:42:14 +0100 Subject: [PATCH] Etcd router bug fixing etcd path prefix matching name (#1899) * add logging and don't get nodes where they exist in router * add more logging * Fix the etcd bug for name matching of keys and prefixes matching names --- registry/etcd/etcd.go | 63 ++++++++++++++++++++++++++----------- registry/etcd/etcd_test.go | 53 +++++++++++++++++++++++++++++++ router/registry/registry.go | 1 + router/registry/table.go | 10 +++--- 4 files changed, 104 insertions(+), 23 deletions(-) create mode 100644 registry/etcd/etcd_test.go diff --git a/registry/etcd/etcd.go b/registry/etcd/etcd.go index 0960bf3f..9d909985 100644 --- a/registry/etcd/etcd.go +++ b/registry/etcd/etcd.go @@ -6,7 +6,6 @@ import ( "crypto/tls" "encoding/json" "errors" - "fmt" "net" "path" "sort" @@ -116,6 +115,32 @@ func configure(e *etcdRegistry, opts ...registry.Option) error { return nil } +// hasName checks if the key has the name we expect +// the key is a path of /prefix/domain/name/id e.g /micro/registry/domain/service/uuid +func hasName(key, prefix, name string) bool { + // strip the prefix from keys + key = strings.TrimPrefix(key, prefix) + + // split the key so we remove domain + parts := strings.Split(key, "/") + + if len(parts) == 0 { + return false + } + + if len(parts[0]) == 0 { + parts = parts[1:] + } + + // we expect a domain and then name domain/service + if len(parts) < 2 { + return false + } + + // ensure the name matches what we expect + return parts[1] == name +} + func encode(s *registry.Service) string { b, _ := json.Marshal(s) return string(b) @@ -392,18 +417,23 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r } var results []*mvccpb.KeyValue + + // TODO: refactorout wildcard, this is an incredibly expensive operation if options.Domain == registry.WildcardDomain { rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable()) if err != nil { return nil, err } - // filter using a check for the service name - keyPath := fmt.Sprintf("/%v/", serializeServiceName(name)) + // filter the results for the key we care about for _, kv := range rsp.Kvs { - if strings.Contains(string(kv.Key), keyPath) { - results = append(results, kv) + // if the key does not contain the name then pass + if !hasName(string(kv.Key), prefix, name) { + continue } + + // save the result if its what we expect + results = append(results, kv) } } else { prefix := servicePath(options.Domain, name) + "/" @@ -421,13 +451,13 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r versions := make(map[string]*registry.Service) for _, n := range results { - // key contains the domain, service name and version. hence, if a service name exists in two - // seperate domains, it'll be returned twice (for wildcard queries), this is because although - // the name is the same, the endpoints / metadata could differ - key, _ := path.Split(string(n.Key)) + // only process the things we care about + if !hasName(string(n.Key), prefix, name) { + continue + } if sn := decode(n.Value); sn != nil { - s, ok := versions[key] + s, ok := versions[sn.Version] if !ok { s = ®istry.Service{ Name: sn.Name, @@ -437,16 +467,18 @@ func (e *etcdRegistry) GetService(name string, opts ...registry.GetOption) ([]*r } versions[s.Version] = s } - s.Nodes = append(s.Nodes, sn.Nodes...) } } services := make([]*registry.Service, 0, len(versions)) + for _, service := range versions { services = append(services, service) } + logger.Tracef("[etcd] registry get service %s returned %v", name, services) + return services, nil } @@ -487,14 +519,9 @@ func (e *etcdRegistry) ListServices(opts ...registry.ListOption) ([]*registry.Se continue } - // key contains the domain, service name and version. hence, if a service name exists in two - // seperate domains, it'll be returned twice (for wildcard queries), this is because although - // the name is the same, the endpoints / metadata could differ - key, _ := path.Split(string(n.Key)) - - v, ok := versions[key] + v, ok := versions[sn.Version] if !ok { - versions[key] = sn + versions[sn.Version] = sn continue } diff --git a/registry/etcd/etcd_test.go b/registry/etcd/etcd_test.go new file mode 100644 index 00000000..d49b8955 --- /dev/null +++ b/registry/etcd/etcd_test.go @@ -0,0 +1,53 @@ +package etcd + +import ( + "testing" +) + +// test whether the name matches +func TestEtcdHasName(t *testing.T) { + testCases := []struct { + key string + prefix string + name string + expect bool + }{ + { + "/micro/registry/micro/registry", + "/micro/registry", + "registry", + true, + }, + { + "/micro/registry/micro/store", + "/micro/registry", + "registry", + false, + }, + { + "/prefix/baz/*/registry", + "/prefix/baz", + "registry", + true, + }, + { + "/prefix/baz/micro/registry", + "/prefix/baz", + "store", + false, + }, + { + "/prefix/baz/micro/registry", + "/prefix/baz", + "registry", + true, + }, + } + + for _, c := range testCases { + v := hasName(c.key, c.prefix, c.name) + if v != c.expect { + t.Fatalf("Expected %t for %v got: %t", c.expect, c, v) + } + } +} diff --git a/router/registry/registry.go b/router/registry/registry.go index 1eaa540c..6b476211 100644 --- a/router/registry/registry.go +++ b/router/registry/registry.go @@ -197,6 +197,7 @@ func (r *rtr) manageRegistryRoutes(reg registry.Registry, action string) error { // fetchRoutes retrieves all the routes for a given service and creates them in the routing table func (r *rtr) fetchRoutes(service string) error { + logger.Tracef("Fetching route for %s domain: %v", service, registry.WildcardDomain) services, err := r.options.Registry.GetService(service, registry.GetDomain(registry.WildcardDomain)) if err == registry.ErrNotFound { logger.Tracef("Failed to find route for %s", service) diff --git a/router/registry/table.go b/router/registry/table.go index 2f1d748b..5bfee761 100644 --- a/router/registry/table.go +++ b/router/registry/table.go @@ -245,21 +245,21 @@ func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) { } // readAndFilter routes for this service under read lock. - readAndFilter := func() ([]router.Route, bool) { + readAndFilter := func(q router.QueryOptions) ([]router.Route, bool) { t.RLock() defer t.RUnlock() - routes, ok := t.routes[opts.Service] + routes, ok := t.routes[q.Service] if !ok || len(routes) == 0 { return nil, false } - return findRoutes(routes, opts.Address, opts.Gateway, opts.Network, opts.Router, opts.Strategy), true + return findRoutes(routes, q.Address, q.Gateway, q.Network, q.Router, q.Strategy), true } if opts.Service != "*" { // try and load services from the cache - if routes, ok := readAndFilter(); ok { + if routes, ok := readAndFilter(opts); ok { return routes, nil } @@ -269,7 +269,7 @@ func (t *table) Query(q ...router.QueryOption) ([]router.Route, error) { } // try again - if routes, ok := readAndFilter(); ok { + if routes, ok := readAndFilter(opts); ok { return routes, nil }