From 51b4ab0abcf81be914acba69f0266de9e0c45363 Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Thu, 25 Jun 2020 11:02:35 +0100 Subject: [PATCH] registry/memory: watcher bug fixes (#1740) * registry/memory: watcher bugfixes * registry/memory: fix nil watcher bug * registry/memory: fix watcher test --- registry/memory/memory.go | 18 +++++++++++++++++- registry/memory/memory_watcher.go | 27 --------------------------- registry/memory/watcher.go | 19 ++++++++++++++++--- registry/memory/watcher_test.go | 7 ++++++- 4 files changed, 39 insertions(+), 32 deletions(-) delete mode 100644 registry/memory/memory_watcher.go diff --git a/registry/memory/memory.go b/registry/memory/memory.go index 7ec34a72..dc3c5ad7 100644 --- a/registry/memory/memory.go +++ b/registry/memory/memory.go @@ -177,6 +177,13 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption srvs = make(services) } + // domain is set in metadata so it can be passed to watchers + if s.Metadata == nil { + s.Metadata = map[string]string{"domain": options.Domain} + } else { + s.Metadata["domain"] = options.Domain + } + // ensure the service name exists r := serviceToRecord(s, options.TTL) if _, ok := srvs[s.Name]; !ok { @@ -189,7 +196,7 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption logger.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version) } m.records[options.Domain] = srvs - go m.sendEvent(®istry.Result{Action: "update", Service: s}) + go m.sendEvent(®istry.Result{Action: "create", Service: s}) } addedNodes := false @@ -245,6 +252,13 @@ func (m *Registry) Deregister(s *registry.Service, opts ...registry.DeregisterOp options.Domain = registry.DefaultDomain } + // domain is set in metadata so it can be passed to watchers + if s.Metadata == nil { + s.Metadata = map[string]string{"domain": options.Domain} + } else { + s.Metadata["domain"] = options.Domain + } + // if the domain doesn't exist, there is nothing to deregister services, ok := m.records[options.Domain] if !ok { @@ -275,6 +289,7 @@ func (m *Registry) Deregister(s *registry.Service, opts ...registry.DeregisterOp // is cleanup if len(version.Nodes) > 0 { m.records[options.Domain][s.Name][s.Version] = version + go m.sendEvent(®istry.Result{Action: "update", Service: s}) return nil } @@ -292,6 +307,7 @@ func (m *Registry) Deregister(s *registry.Service, opts ...registry.DeregisterOp // there are other versions of the service running, so only remove this version of it delete(m.records[options.Domain][s.Name], s.Version) + go m.sendEvent(®istry.Result{Action: "delete", Service: s}) if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version) } diff --git a/registry/memory/memory_watcher.go b/registry/memory/memory_watcher.go deleted file mode 100644 index 79346ec3..00000000 --- a/registry/memory/memory_watcher.go +++ /dev/null @@ -1,27 +0,0 @@ -package memory - -import ( - "errors" - - "github.com/micro/go-micro/v2/registry" -) - -type memoryWatcher struct { - exit chan bool - opts registry.WatchOptions -} - -func (m *memoryWatcher) Next() (*registry.Result, error) { - // not implement so we just block until exit - <-m.exit - return nil, errors.New("watcher stopped") -} - -func (m *memoryWatcher) Stop() { - select { - case <-m.exit: - return - default: - close(m.exit) - } -} diff --git a/registry/memory/watcher.go b/registry/memory/watcher.go index 87c853a1..123d4810 100644 --- a/registry/memory/watcher.go +++ b/registry/memory/watcher.go @@ -17,13 +17,26 @@ func (m *Watcher) Next() (*registry.Result, error) { for { select { case r := <-m.res: + if r.Service == nil { + continue + } + if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name { continue } - if m.wo.Domain != registry.WildcardDomain && m.wo.Domain != m.wo.Domain { - continue + + // extract domain from service metadata + var domain string + if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 { + domain = r.Service.Metadata["domain"] + } else { + domain = registry.DefaultDomain + } + + // only send the event if watching the wildcard or this specific domain + if m.wo.Domain == registry.WildcardDomain || m.wo.Domain == domain { + return r, nil } - return r, nil case <-m.exit: return nil, errors.New("watcher stopped") } diff --git a/registry/memory/watcher_test.go b/registry/memory/watcher_test.go index 37b4a4ae..4890647d 100644 --- a/registry/memory/watcher_test.go +++ b/registry/memory/watcher_test.go @@ -11,10 +11,15 @@ func TestWatcher(t *testing.T) { id: "test", res: make(chan *registry.Result), exit: make(chan bool), + wo: registry.WatchOptions{ + Domain: registry.WildcardDomain, + }, } go func() { - w.res <- ®istry.Result{} + w.res <- ®istry.Result{ + Service: ®istry.Service{Name: "foo"}, + } }() _, err := w.Next()