Merge pull request #781 from milosgajdos83/hash-reg-service
Emit memory registry event only when it actually happens
This commit is contained in:
		| @@ -10,6 +10,10 @@ import ( | |||||||
| 	"github.com/micro/go-micro/registry" | 	"github.com/micro/go-micro/registry" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	timeout = time.Millisecond * 10 | ||||||
|  | ) | ||||||
|  |  | ||||||
| type Registry struct { | type Registry struct { | ||||||
| 	options registry.Options | 	options registry.Options | ||||||
|  |  | ||||||
| @@ -18,11 +22,28 @@ type Registry struct { | |||||||
| 	Watchers map[string]*Watcher | 	Watchers map[string]*Watcher | ||||||
| } | } | ||||||
|  |  | ||||||
| var ( | func NewRegistry(opts ...registry.Option) registry.Registry { | ||||||
| 	timeout = time.Millisecond * 10 | 	options := registry.Options{ | ||||||
| ) | 		Context: context.Background(), | ||||||
|  | 	} | ||||||
|  |  | ||||||
| func (m *Registry) watch(r *registry.Result) { | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	services := getServices(options.Context) | ||||||
|  | 	if services == nil { | ||||||
|  | 		services = make(map[string][]*registry.Service) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &Registry{ | ||||||
|  | 		options:  options, | ||||||
|  | 		Services: services, | ||||||
|  | 		Watchers: make(map[string]*Watcher), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Registry) sendEvent(r *registry.Result) { | ||||||
| 	var watchers []*Watcher | 	var watchers []*Watcher | ||||||
|  |  | ||||||
| 	m.RLock() | 	m.RLock() | ||||||
| @@ -87,30 +108,55 @@ func (m *Registry) ListServices() ([]*registry.Service, error) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error { | func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error { | ||||||
| 	go m.watch(®istry.Result{Action: "update", Service: s}) |  | ||||||
| 	m.Lock() | 	m.Lock() | ||||||
|  | 	defer m.Unlock() | ||||||
|  |  | ||||||
| 	if service, ok := m.Services[s.Name]; !ok { | 	if service, ok := m.Services[s.Name]; !ok { | ||||||
| 		m.Services[s.Name] = []*registry.Service{s} | 		m.Services[s.Name] = []*registry.Service{s} | ||||||
|  | 		go m.sendEvent(®istry.Result{Action: "update", Service: s}) | ||||||
| 	} else { | 	} else { | ||||||
| 		m.Services[s.Name] = registry.Merge(service, []*registry.Service{s}) | 		svcCount := len(service) | ||||||
|  | 		svcNodeCounts := make(map[string]map[string]int) | ||||||
|  | 		for _, s := range service { | ||||||
|  | 			if _, ok := svcNodeCounts[s.Name]; !ok { | ||||||
|  | 				svcNodeCounts[s.Name] = make(map[string]int) | ||||||
|  | 			} | ||||||
|  | 			if _, ok := svcNodeCounts[s.Name][s.Version]; !ok { | ||||||
|  | 				svcNodeCounts[s.Name][s.Version] = len(s.Nodes) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		// if merged count and original service counts changed we added new version of the service | ||||||
|  | 		merged := registry.Merge(service, []*registry.Service{s}) | ||||||
|  | 		if len(merged) != svcCount { | ||||||
|  | 			m.Services[s.Name] = merged | ||||||
|  | 			go m.sendEvent(®istry.Result{Action: "update", Service: s}) | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 		// if the node count for a particular service has changed we added a new node to the service | ||||||
|  | 		for _, s := range merged { | ||||||
|  | 			if len(s.Nodes) != svcNodeCounts[s.Name][s.Version] { | ||||||
|  | 				m.Services[s.Name] = merged | ||||||
|  | 				go m.sendEvent(®istry.Result{Action: "update", Service: s}) | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	m.Unlock() |  | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *Registry) Deregister(s *registry.Service) error { | func (m *Registry) Deregister(s *registry.Service) error { | ||||||
| 	go m.watch(®istry.Result{Action: "delete", Service: s}) |  | ||||||
|  |  | ||||||
| 	m.Lock() | 	m.Lock() | ||||||
|  | 	defer m.Unlock() | ||||||
|  |  | ||||||
| 	if service, ok := m.Services[s.Name]; ok { | 	if service, ok := m.Services[s.Name]; ok { | ||||||
|  | 		go m.sendEvent(®istry.Result{Action: "delete", Service: s}) | ||||||
| 		if service := registry.Remove(service, []*registry.Service{s}); len(service) == 0 { | 		if service := registry.Remove(service, []*registry.Service{s}); len(service) == 0 { | ||||||
| 			delete(m.Services, s.Name) | 			delete(m.Services, s.Name) | ||||||
| 		} else { | 		} else { | ||||||
| 			m.Services[s.Name] = service | 			m.Services[s.Name] = service | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	m.Unlock() |  | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| @@ -137,24 +183,3 @@ func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) | |||||||
| func (m *Registry) String() string { | func (m *Registry) String() string { | ||||||
| 	return "memory" | 	return "memory" | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewRegistry(opts ...registry.Option) registry.Registry { |  | ||||||
| 	options := registry.Options{ |  | ||||||
| 		Context: context.Background(), |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&options) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	services := getServices(options.Context) |  | ||||||
| 	if services == nil { |  | ||||||
| 		services = make(map[string][]*registry.Service) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return &Registry{ |  | ||||||
| 		options:  options, |  | ||||||
| 		Services: services, |  | ||||||
| 		Watchers: make(map[string]*Watcher), |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user