registry/memory: watcher bug fixes (#1740)
* registry/memory: watcher bugfixes * registry/memory: fix nil watcher bug * registry/memory: fix watcher test
This commit is contained in:
parent
687a5e2e58
commit
51b4ab0abc
@ -177,6 +177,13 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption
|
|||||||
srvs = make(services)
|
srvs = make(services)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// domain is set in metadata so it can be passed to watchers
|
||||||
|
if s.Metadata == nil {
|
||||||
|
s.Metadata = map[string]string{"domain": options.Domain}
|
||||||
|
} else {
|
||||||
|
s.Metadata["domain"] = options.Domain
|
||||||
|
}
|
||||||
|
|
||||||
// ensure the service name exists
|
// ensure the service name exists
|
||||||
r := serviceToRecord(s, options.TTL)
|
r := serviceToRecord(s, options.TTL)
|
||||||
if _, ok := srvs[s.Name]; !ok {
|
if _, ok := srvs[s.Name]; !ok {
|
||||||
@ -189,7 +196,7 @@ func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption
|
|||||||
logger.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version)
|
logger.Debugf("Registry added new service: %s, version: %s", s.Name, s.Version)
|
||||||
}
|
}
|
||||||
m.records[options.Domain] = srvs
|
m.records[options.Domain] = srvs
|
||||||
go m.sendEvent(®istry.Result{Action: "update", Service: s})
|
go m.sendEvent(®istry.Result{Action: "create", Service: s})
|
||||||
}
|
}
|
||||||
|
|
||||||
addedNodes := false
|
addedNodes := false
|
||||||
@ -245,6 +252,13 @@ func (m *Registry) Deregister(s *registry.Service, opts ...registry.DeregisterOp
|
|||||||
options.Domain = registry.DefaultDomain
|
options.Domain = registry.DefaultDomain
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// domain is set in metadata so it can be passed to watchers
|
||||||
|
if s.Metadata == nil {
|
||||||
|
s.Metadata = map[string]string{"domain": options.Domain}
|
||||||
|
} else {
|
||||||
|
s.Metadata["domain"] = options.Domain
|
||||||
|
}
|
||||||
|
|
||||||
// if the domain doesn't exist, there is nothing to deregister
|
// if the domain doesn't exist, there is nothing to deregister
|
||||||
services, ok := m.records[options.Domain]
|
services, ok := m.records[options.Domain]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -275,6 +289,7 @@ func (m *Registry) Deregister(s *registry.Service, opts ...registry.DeregisterOp
|
|||||||
// is cleanup
|
// is cleanup
|
||||||
if len(version.Nodes) > 0 {
|
if len(version.Nodes) > 0 {
|
||||||
m.records[options.Domain][s.Name][s.Version] = version
|
m.records[options.Domain][s.Name][s.Version] = version
|
||||||
|
go m.sendEvent(®istry.Result{Action: "update", Service: s})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,6 +307,7 @@ func (m *Registry) Deregister(s *registry.Service, opts ...registry.DeregisterOp
|
|||||||
|
|
||||||
// there are other versions of the service running, so only remove this version of it
|
// there are other versions of the service running, so only remove this version of it
|
||||||
delete(m.records[options.Domain][s.Name], s.Version)
|
delete(m.records[options.Domain][s.Name], s.Version)
|
||||||
|
go m.sendEvent(®istry.Result{Action: "delete", Service: s})
|
||||||
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
|
||||||
logger.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version)
|
logger.Debugf("Registry removed service: %s, version: %s", s.Name, s.Version)
|
||||||
}
|
}
|
||||||
|
@ -1,27 +0,0 @@
|
|||||||
package memory
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/registry"
|
|
||||||
)
|
|
||||||
|
|
||||||
type memoryWatcher struct {
|
|
||||||
exit chan bool
|
|
||||||
opts registry.WatchOptions
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryWatcher) Next() (*registry.Result, error) {
|
|
||||||
// not implement so we just block until exit
|
|
||||||
<-m.exit
|
|
||||||
return nil, errors.New("watcher stopped")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryWatcher) Stop() {
|
|
||||||
select {
|
|
||||||
case <-m.exit:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
close(m.exit)
|
|
||||||
}
|
|
||||||
}
|
|
@ -17,13 +17,26 @@ func (m *Watcher) Next() (*registry.Result, error) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case r := <-m.res:
|
case r := <-m.res:
|
||||||
|
if r.Service == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name {
|
if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if m.wo.Domain != registry.WildcardDomain && m.wo.Domain != m.wo.Domain {
|
|
||||||
continue
|
// extract domain from service metadata
|
||||||
|
var domain string
|
||||||
|
if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 {
|
||||||
|
domain = r.Service.Metadata["domain"]
|
||||||
|
} else {
|
||||||
|
domain = registry.DefaultDomain
|
||||||
|
}
|
||||||
|
|
||||||
|
// only send the event if watching the wildcard or this specific domain
|
||||||
|
if m.wo.Domain == registry.WildcardDomain || m.wo.Domain == domain {
|
||||||
|
return r, nil
|
||||||
}
|
}
|
||||||
return r, nil
|
|
||||||
case <-m.exit:
|
case <-m.exit:
|
||||||
return nil, errors.New("watcher stopped")
|
return nil, errors.New("watcher stopped")
|
||||||
}
|
}
|
||||||
|
@ -11,10 +11,15 @@ func TestWatcher(t *testing.T) {
|
|||||||
id: "test",
|
id: "test",
|
||||||
res: make(chan *registry.Result),
|
res: make(chan *registry.Result),
|
||||||
exit: make(chan bool),
|
exit: make(chan bool),
|
||||||
|
wo: registry.WatchOptions{
|
||||||
|
Domain: registry.WildcardDomain,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
w.res <- ®istry.Result{}
|
w.res <- ®istry.Result{
|
||||||
|
Service: ®istry.Service{Name: "foo"},
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, err := w.Next()
|
_, err := w.Next()
|
||||||
|
Loading…
Reference in New Issue
Block a user