diff --git a/registry/consul_registry.go b/registry/consul_registry.go index fa0b9c12..0a04d00f 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net" - "sync" consul "github.com/hashicorp/consul/api" ) @@ -13,9 +12,6 @@ import ( type consulRegistry struct { Address string Client *consul.Client - - mtx sync.RWMutex - services map[string][]*Service } func encodeEndpoints(en []*Endpoint) []string { @@ -86,9 +82,8 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry { client, _ := consul.NewClient(config) cr := &consulRegistry{ - Address: config.Address, - Client: client, - services: make(map[string][]*Service), + Address: config.Address, + Client: client, } return cr @@ -134,14 +129,6 @@ func (c *consulRegistry) Register(s *Service) error { } func (c *consulRegistry) GetService(name string) ([]*Service, error) { - c.mtx.RLock() - service, ok := c.services[name] - c.mtx.RUnlock() - - if ok { - return service, nil - } - rsp, _, err := c.Client.Catalog().Service(name, "", nil) if err != nil { return nil, err @@ -191,24 +178,13 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) { } func (c *consulRegistry) ListServices() ([]*Service, error) { - c.mtx.RLock() - serviceMap := c.services - c.mtx.RUnlock() - - var services []*Service - - if len(serviceMap) > 0 { - for _, service := range serviceMap { - services = append(services, service...) - } - return services, nil - } - rsp, _, err := c.Client.Catalog().Services(&consul.QueryOptions{}) if err != nil { return nil, err } + var services []*Service + for service, _ := range rsp { services = append(services, &Service{Name: service}) } diff --git a/registry/consul_watcher.go b/registry/consul_watcher.go index 4027bf2c..c67494da 100644 --- a/registry/consul_watcher.go +++ b/registry/consul_watcher.go @@ -1,24 +1,33 @@ package registry import ( + "errors" + "sync" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/watch" ) type consulWatcher struct { - Registry *consulRegistry + r *consulRegistry wp *watch.WatchPlan watchers map[string]*watch.WatchPlan -} -type serviceWatcher struct { - name string + once sync.Once + next chan *Result + + sync.RWMutex + services map[string][]*Service } func newConsulWatcher(cr *consulRegistry) (Watcher, error) { + var once sync.Once cw := &consulWatcher{ - Registry: cr, + r: cr, + once: once, + next: make(chan *Result, 10), watchers: make(map[string]*watch.WatchPlan), + services: make(map[string][]*Service), } wp, err := watch.Parse(map[string]interface{}{"type": "services"}) @@ -26,7 +35,7 @@ func newConsulWatcher(cr *consulRegistry) (Watcher, error) { return nil, err } - wp.Handler = cw.Handle + wp.Handler = cw.handle go wp.Run(cr.Address) cw.wp = wp @@ -73,16 +82,80 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { }) } - cw.Registry.mtx.Lock() - var services []*Service - for _, service := range serviceMap { - services = append(services, service) + cw.RLock() + rservices := cw.services + cw.RUnlock() + + var newServices []*Service + + // serviceMap is the new set of services keyed by name+version + for _, newService := range serviceMap { + // append to the new set of cached services + newServices = append(newServices, newService) + + // check if the service exists in the existing cache + oldServices, ok := rservices[serviceName] + if !ok { + // does not exist? then we're creating brand new entries + cw.next <- &Result{Action: "create", Service: newService} + continue + } + + // service exists. ok let's figure out what to update and delete version wise + action := "create" + + for _, oldService := range oldServices { + // does this version exist? + // no? then default to create + if oldService.Version != newService.Version { + continue + } + + // yes? then it's an update + action = "update" + + var nodes []*Node + // check the old nodes to see if they've been deleted + for _, oldNode := range oldService.Nodes { + var seen bool + for _, newNode := range newService.Nodes { + if newNode.Id == oldNode.Id { + seen = true + break + } + } + // does the old node exist in the new set of nodes + // no? then delete that shit + if !seen { + nodes = append(nodes, oldNode) + } + } + + // it's an update rather than creation + if len(nodes) > 0 { + delService := oldService + delService.Nodes = nodes + cw.next <- &Result{Action: "delete", Service: delService} + } + } + cw.next <- &Result{Action: action, Service: newService} } - cw.Registry.services[serviceName] = services - cw.Registry.mtx.Unlock() + + // Now check old versions that may not be in new services map + for _, old := range rservices[serviceName] { + // old version does not exist in new version map + // kill it with fire! + if _, ok := serviceMap[serviceName+old.Version]; !ok { + cw.next <- &Result{Action: "delete", Service: old} + } + } + + cw.Lock() + cw.services[serviceName] = newServices + cw.Unlock() } -func (cw *consulWatcher) Handle(idx uint64, data interface{}) { +func (cw *consulWatcher) handle(idx uint64, data interface{}) { services, ok := data.(map[string][]string) if !ok { return @@ -99,21 +172,22 @@ func (cw *consulWatcher) Handle(idx uint64, data interface{}) { }) if err == nil { wp.Handler = cw.serviceHandler - go wp.Run(cw.Registry.Address) + go wp.Run(cw.r.Address) cw.watchers[service] = wp + cw.next <- &Result{Action: "create", Service: &Service{Name: service}} } } - cw.Registry.mtx.RLock() - rservices := cw.Registry.services - cw.Registry.mtx.RUnlock() + cw.RLock() + rservices := cw.services + cw.RUnlock() // remove unknown services from registry for service, _ := range rservices { if _, ok := services[service]; !ok { - cw.Registry.mtx.Lock() - delete(cw.Registry.services, service) - cw.Registry.mtx.Unlock() + cw.Lock() + delete(cw.services, service) + cw.Unlock() } } @@ -122,13 +196,26 @@ func (cw *consulWatcher) Handle(idx uint64, data interface{}) { if _, ok := services[service]; !ok { w.Stop() delete(cw.watchers, service) + cw.next <- &Result{Action: "delete", Service: &Service{Name: service}} } } } +func (cw *consulWatcher) Next() (*Result, error) { + r, ok := <-cw.next + if !ok { + return nil, errors.New("chan closed") + } + return r, nil +} + func (cw *consulWatcher) Stop() { if cw.wp == nil { return } cw.wp.Stop() + + cw.once.Do(func() { + close(cw.next) + }) } diff --git a/registry/registry.go b/registry/registry.go index 93a5a45a..4f211d6e 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -8,10 +8,6 @@ type Registry interface { Watch() (Watcher, error) } -type Watcher interface { - Stop() -} - type options struct{} type Option func(*options) @@ -39,3 +35,7 @@ func GetService(name string) ([]*Service, error) { func ListServices() ([]*Service, error) { return DefaultRegistry.ListServices() } + +func Watch() (Watcher, error) { + return DefaultRegistry.Watch() +} diff --git a/registry/watcher.go b/registry/watcher.go new file mode 100644 index 00000000..c26cc04d --- /dev/null +++ b/registry/watcher.go @@ -0,0 +1,11 @@ +package registry + +type Watcher interface { + Next() (*Result, error) + Stop() +} + +type Result struct { + Action string + Service *Service +}