From 8c7c27c573f5453ed0bf4078d710071553fe40ae Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Thu, 18 Jun 2020 12:39:19 +0100 Subject: [PATCH] registry/memory: add support for domain options (#1713) * registry/memory: add support for the domain options * registry/memory: swap Fatal test cases with Error * registry/memory: fix wildcard not found bug * registry/memory: replace locks with rlocks * registry/memory: fix deregistration bug --- registry/memory/memory.go | 279 ++++++++++++++++++++++++--------- registry/memory/memory_test.go | 36 +++++ registry/memory/util.go | 5 +- registry/memory/watcher.go | 3 + 4 files changed, 251 insertions(+), 72 deletions(-) diff --git a/registry/memory/memory.go b/registry/memory/memory.go index 44f22439..4d6bef71 100644 --- a/registry/memory/memory.go +++ b/registry/memory/memory.go @@ -14,6 +14,7 @@ import ( var ( sendEventTime = 10 * time.Millisecond ttlPruneTime = time.Second + defaultDomain = "micro" ) type node struct { @@ -34,27 +35,32 @@ type Registry struct { options registry.Options sync.RWMutex - records map[string]map[string]*record + // records is a KV map with domain name as the key and a services map as the value + records map[string]services watchers map[string]*Watcher } +// services is a KV map with service name as the key and a map of records as the value +type services map[string]map[string]*record + +// NewRegistry returns an initialized in-memory registry func NewRegistry(opts ...registry.Option) registry.Registry { options := registry.Options{ Context: context.Background(), } - for _, o := range opts { o(&options) } + // records can be passed for testing purposes records := getServiceRecords(options.Context) if records == nil { - records = make(map[string]map[string]*record) + records = make(services) } reg := &Registry{ options: options, - records: records, + records: map[string]services{defaultDomain: records}, watchers: make(map[string]*Watcher), } @@ -71,14 +77,16 @@ func (m *Registry) ttlPrune() { select { case <-prune.C: m.Lock() - for name, records := range m.records { - for version, record := range records { - for id, n := range record.Nodes { - if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Registry TTL expired for node %s of service %s", n.Id, name) + for domain, services := range m.records { + for service, versions := range services { + for version, record := range versions { + for id, n := range record.Nodes { + if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry TTL expired for node %s of service %s", n.Id, service) + } + delete(m.records[domain][service][version].Nodes, id) } - delete(m.records[name][version].Nodes, id) } } } @@ -120,22 +128,30 @@ func (m *Registry) Init(opts ...registry.Option) error { m.Lock() defer m.Unlock() - records := getServiceRecords(m.options.Context) - for name, record := range records { - // add a whole new service including all of its versions - if _, ok := m.records[name]; !ok { - m.records[name] = record + // get the existing services from the records + srvs, ok := m.records[defaultDomain] + if !ok { + srvs = make(services) + } + + // loop through the services and if it doesn't yet exist, add it to the slice. This is used for + // testing purposes. + for name, record := range getServiceRecords(m.options.Context) { + if _, ok := srvs[name]; !ok { + srvs[name] = record continue } - // add the versions of the service we dont track yet + for version, r := range record { - if _, ok := m.records[name][version]; !ok { - m.records[name][version] = r + if _, ok := srvs[name][version]; !ok { + srvs[name][version] = r continue } } } + // set the services in the registry + m.records[defaultDomain] = srvs return nil } @@ -147,34 +163,44 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption m.Lock() defer m.Unlock() + // parse the options, fallback to the default domain var options registry.RegisterOptions for _, o := range opts { o(&options) } - - r := serviceToRecord(s, options.TTL) - - if _, ok := m.records[s.Name]; !ok { - m.records[s.Name] = make(map[string]*record) + if len(options.Domain) == 0 { + options.Domain = defaultDomain } - if _, ok := m.records[s.Name][s.Version]; !ok { - m.records[s.Name][s.Version] = r + // get the services for this domain from the registry + srvs, ok := m.records[options.Domain] + if !ok { + srvs = make(services) + } + + // ensure the service name exists + r := serviceToRecord(s, options.TTL) + if _, ok := srvs[s.Name]; !ok { + srvs[s.Name] = make(map[string]*record) + } + + if _, ok := srvs[s.Name][s.Version]; !ok { + srvs[s.Name][s.Version] = r if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version) } + m.records[options.Domain] = srvs go m.sendEvent(®istry.Result{Action: "update", Service: s}) - return nil } addedNodes := false for _, n := range s.Nodes { - if _, ok := m.records[s.Name][s.Version].Nodes[n.Id]; !ok { + if _, ok := srvs[s.Name][s.Version].Nodes[n.Id]; !ok { addedNodes = true metadata := make(map[string]string) for k, v := range n.Metadata { metadata[k] = v - m.records[s.Name][s.Version].Nodes[n.Id] = &node{ + srvs[s.Name][s.Version].Nodes[n.Id] = &node{ Node: ®istry.Node{ Id: n.Id, Address: n.Address, @@ -192,18 +218,18 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption logger.Debugf("Registry added new node to service: %s, version: %s", s.Name, s.Version) } go m.sendEvent(®istry.Result{Action: "update", Service: s}) - return nil - } - - // refresh TTL and timestamp - for _, n := range s.Nodes { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Updated registration for service: %s, version: %s", s.Name, s.Version) + } else { + // refresh TTL and timestamp + for _, n := range s.Nodes { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Updated registration for service: %s, version: %s", s.Name, s.Version) + } + srvs[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL + srvs[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now() } - m.records[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL - m.records[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now() } + m.records[options.Domain] = srvs return nil } @@ -211,74 +237,185 @@ func (m *Registry) Deregister(s *registry.Service, opts ...registry.DeregisterOp m.Lock() defer m.Unlock() - if _, ok := m.records[s.Name]; ok { - if _, ok := m.records[s.Name][s.Version]; ok { - for _, n := range s.Nodes { - if _, ok := m.records[s.Name][s.Version].Nodes[n.Id]; ok { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Registry removed node from service: %s, version: %s", s.Name, s.Version) - } - delete(m.records[s.Name][s.Version].Nodes, n.Id) - } - } - if len(m.records[s.Name][s.Version].Nodes) == 0 { - delete(m.records[s.Name], s.Version) - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version) - } - } - } - if len(m.records[s.Name]) == 0 { - delete(m.records, s.Name) + // parse the options, fallback to the default domain + var options registry.DeregisterOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = defaultDomain + } + + // if the domain doesn't exist, there is nothing to deregister + services, ok := m.records[options.Domain] + if !ok { + return nil + } + + // if no services with this name and version exist, there is nothing to deregister + versions, ok := services[s.Name] + if !ok { + return nil + } + version, ok := versions[s.Version] + if !ok { + return nil + } + + // deregister all of the service nodes from this version + for _, n := range s.Nodes { + if _, ok := version.Nodes[n.Id]; ok { if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Registry removed service: %s", s.Name) + logger.Debugf("Registry removed node from service: %s, version: %s", s.Name, s.Version) } + delete(version.Nodes, n.Id) } + } + + // if the nodes not empty, we replace the version in the store and exist, the rest of the logic + // is cleanup + if len(version.Nodes) > 0 { + m.records[options.Domain][s.Name][s.Version] = version + return nil + } + + // if this version was the only version of the service, we can remove the whole service from the + // registry and exit + if len(versions) == 1 { + delete(m.records[options.Domain], s.Name) go m.sendEvent(®istry.Result{Action: "delete", Service: s}) + + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry removed service: %s", s.Name) + } + return nil + } + + // there are other versions of the service running, so only remove this version of it + delete(m.records[options.Domain][s.Name], s.Version) + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version) } return nil } func (m *Registry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) { + // parse the options, fallback to the default domain + var options registry.GetOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = defaultDomain + } + + // if it's a wildcard domain, return from all domains + if options.Domain == registry.WildcardDomain { + m.RLock() + recs := m.records + m.RUnlock() + + var services []*registry.Service + for domain := range recs { + srvs, err := m.GetService(name, append(opts, registry.GetDomain(domain))...) + if err == registry.ErrNotFound { + continue + } else if err != nil { + return nil, err + } + services = append(services, srvs...) + } + + if len(services) == 0 { + return nil, registry.ErrNotFound + } + return services, nil + } + m.RLock() defer m.RUnlock() - records, ok := m.records[name] + // check the domain exists + services, ok := m.records[options.Domain] if !ok { return nil, registry.ErrNotFound } - services := make([]*registry.Service, len(m.records[name])) - i := 0 - for _, record := range records { - services[i] = recordToService(record) - i++ + // check the service exists + versions, ok := services[name] + if !ok || len(versions) == 0 { + return nil, registry.ErrNotFound } - return services, nil + // serialize the response + result := make([]*registry.Service, len(versions)) + i := 0 + for _, r := range versions { + result[i] = recordToService(r, options.Domain) + i++ + } + return result, nil } func (m *Registry) ListServices(opts ...registry.ListOption) ([]*registry.Service, error) { + // parse the options, fallback to the default domain + var options registry.ListOptions + for _, o := range opts { + o(&options) + } + if len(options.Domain) == 0 { + options.Domain = defaultDomain + } + + // if it's a wildcard domain, list from all domains + if options.Domain == registry.WildcardDomain { + m.RLock() + recs := m.records + m.RUnlock() + + var services []*registry.Service + for domain := range recs { + srvs, err := m.ListServices(append(opts, registry.ListDomain(domain))...) + if err != nil { + return nil, err + } + services = append(services, srvs...) + } + + return services, nil + } + m.RLock() defer m.RUnlock() - var services []*registry.Service - for _, records := range m.records { - for _, record := range records { - services = append(services, recordToService(record)) - } + // ensure the domain exists + services, ok := m.records[options.Domain] + if !ok { + return make([]*registry.Service, 0), nil } - return services, nil + // serialize the result, each version counts as an individual service + var result []*registry.Service + for domain, service := range services { + for _, version := range service { + result = append(result, recordToService(version, domain)) + } + } + return result, nil } func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) { + // parse the options, fallback to the default domain var wo registry.WatchOptions for _, o := range opts { o(&wo) } + if len(wo.Domain) == 0 { + wo.Domain = defaultDomain + } + // construct the watcher w := &Watcher{ exit: make(chan bool), res: make(chan *registry.Result), diff --git a/registry/memory/memory_test.go b/registry/memory/memory_test.go index 0244a8da..faf1a1d8 100644 --- a/registry/memory/memory_test.go +++ b/registry/memory/memory_test.go @@ -244,3 +244,39 @@ func TestMemoryRegistryTTLConcurrent(t *testing.T) { } } } + +func TestMemoryWildcard(t *testing.T) { + m := NewRegistry() + testSrv := ®istry.Service{Name: "foo", Version: "1.0.0"} + + if err := m.Register(testSrv, registry.RegisterDomain("one")); err != nil { + t.Fatalf("Register err: %v", err) + } + if err := m.Register(testSrv, registry.RegisterDomain("two")); err != nil { + t.Fatalf("Register err: %v", err) + } + + if recs, err := m.ListServices(registry.ListDomain("one")); err != nil { + t.Errorf("List err: %v", err) + } else if len(recs) != 1 { + t.Errorf("Expected 1 record, got %v", len(recs)) + } + + if recs, err := m.ListServices(registry.ListDomain("*")); err != nil { + t.Errorf("List err: %v", err) + } else if len(recs) != 2 { + t.Errorf("Expected 2 records, got %v", len(recs)) + } + + if recs, err := m.GetService(testSrv.Name, registry.GetDomain("one")); err != nil { + t.Errorf("Get err: %v", err) + } else if len(recs) != 1 { + t.Errorf("Expected 1 record, got %v", len(recs)) + } + + if recs, err := m.GetService(testSrv.Name, registry.GetDomain("*")); err != nil { + t.Errorf("Get err: %v", err) + } else if len(recs) != 2 { + t.Errorf("Expected 2 records, got %v", len(recs)) + } +} diff --git a/registry/memory/util.go b/registry/memory/util.go index 69cfc8de..b1f3830b 100644 --- a/registry/memory/util.go +++ b/registry/memory/util.go @@ -35,12 +35,15 @@ func serviceToRecord(s *registry.Service, ttl time.Duration) *record { } } -func recordToService(r *record) *registry.Service { +func recordToService(r *record, domain string) *registry.Service { metadata := make(map[string]string, len(r.Metadata)) for k, v := range r.Metadata { metadata[k] = v } + // set the domain in metadata so it can be determined when a wildcard query is performed + metadata["domain"] = domain + endpoints := make([]*registry.Endpoint, len(r.Endpoints)) for i, e := range r.Endpoints { request := new(registry.Value) diff --git a/registry/memory/watcher.go b/registry/memory/watcher.go index 76ae6440..87c853a1 100644 --- a/registry/memory/watcher.go +++ b/registry/memory/watcher.go @@ -20,6 +20,9 @@ func (m *Watcher) Next() (*registry.Result, error) { if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name { continue } + if m.wo.Domain != registry.WildcardDomain && m.wo.Domain != m.wo.Domain { + continue + } return r, nil case <-m.exit: return nil, errors.New("watcher stopped")