diff --git a/registry/memory/memory.go b/registry/memory/memory.go index 6c03df67..446aa0f0 100644 --- a/registry/memory/memory.go +++ b/registry/memory/memory.go @@ -10,6 +10,10 @@ import ( "github.com/micro/go-micro/registry" ) +var ( + timeout = time.Millisecond * 10 +) + type Registry struct { options registry.Options @@ -18,11 +22,28 @@ type Registry struct { Watchers map[string]*Watcher } -var ( - timeout = time.Millisecond * 10 -) +func NewRegistry(opts ...registry.Option) registry.Registry { + options := registry.Options{ + Context: context.Background(), + } -func (m *Registry) watch(r *registry.Result) { + for _, o := range opts { + o(&options) + } + + services := getServices(options.Context) + if services == nil { + services = make(map[string][]*registry.Service) + } + + return &Registry{ + options: options, + Services: services, + Watchers: make(map[string]*Watcher), + } +} + +func (m *Registry) sendEvent(r *registry.Result) { var watchers []*Watcher m.RLock() @@ -87,30 +108,55 @@ func (m *Registry) ListServices() ([]*registry.Service, error) { } func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error { - go m.watch(®istry.Result{Action: "update", Service: s}) m.Lock() + defer m.Unlock() + if service, ok := m.Services[s.Name]; !ok { m.Services[s.Name] = []*registry.Service{s} + go m.sendEvent(®istry.Result{Action: "update", Service: s}) } else { - m.Services[s.Name] = registry.Merge(service, []*registry.Service{s}) + svcCount := len(service) + svcNodeCounts := make(map[string]map[string]int) + for _, s := range service { + if _, ok := svcNodeCounts[s.Name]; !ok { + svcNodeCounts[s.Name] = make(map[string]int) + } + if _, ok := svcNodeCounts[s.Name][s.Version]; !ok { + svcNodeCounts[s.Name][s.Version] = len(s.Nodes) + } + } + // if merged count and original service counts changed we added new version of the service + merged := registry.Merge(service, []*registry.Service{s}) + if len(merged) != svcCount { + m.Services[s.Name] = merged + go m.sendEvent(®istry.Result{Action: "update", Service: s}) + return nil + } + // if the node count for a particular service has changed we added a new node to the service + for _, s := range merged { + if len(s.Nodes) != svcNodeCounts[s.Name][s.Version] { + m.Services[s.Name] = merged + go m.sendEvent(®istry.Result{Action: "update", Service: s}) + return nil + } + } } - m.Unlock() return nil } func (m *Registry) Deregister(s *registry.Service) error { - go m.watch(®istry.Result{Action: "delete", Service: s}) - m.Lock() + defer m.Unlock() + if service, ok := m.Services[s.Name]; ok { + go m.sendEvent(®istry.Result{Action: "delete", Service: s}) if service := registry.Remove(service, []*registry.Service{s}); len(service) == 0 { delete(m.Services, s.Name) } else { m.Services[s.Name] = service } } - m.Unlock() return nil } @@ -137,24 +183,3 @@ func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) func (m *Registry) String() string { return "memory" } - -func NewRegistry(opts ...registry.Option) registry.Registry { - options := registry.Options{ - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - services := getServices(options.Context) - if services == nil { - services = make(map[string][]*registry.Service) - } - - return &Registry{ - options: options, - Services: services, - Watchers: make(map[string]*Watcher), - } -}