Update the watch interface

This commit is contained in:
Asim 2015-12-05 01:12:29 +00:00
parent cc6555927d
commit d4f3a6070c
4 changed files with 126 additions and 52 deletions

View File

@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
consul "github.com/hashicorp/consul/api" consul "github.com/hashicorp/consul/api"
) )
@ -13,9 +12,6 @@ import (
type consulRegistry struct { type consulRegistry struct {
Address string Address string
Client *consul.Client Client *consul.Client
mtx sync.RWMutex
services map[string][]*Service
} }
func encodeEndpoints(en []*Endpoint) []string { func encodeEndpoints(en []*Endpoint) []string {
@ -88,7 +84,6 @@ func newConsulRegistry(addrs []string, opts ...Option) Registry {
cr := &consulRegistry{ cr := &consulRegistry{
Address: config.Address, Address: config.Address,
Client: client, Client: client,
services: make(map[string][]*Service),
} }
return cr return cr
@ -134,14 +129,6 @@ func (c *consulRegistry) Register(s *Service) error {
} }
func (c *consulRegistry) GetService(name string) ([]*Service, error) { func (c *consulRegistry) GetService(name string) ([]*Service, error) {
c.mtx.RLock()
service, ok := c.services[name]
c.mtx.RUnlock()
if ok {
return service, nil
}
rsp, _, err := c.Client.Catalog().Service(name, "", nil) rsp, _, err := c.Client.Catalog().Service(name, "", nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -191,24 +178,13 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
} }
func (c *consulRegistry) ListServices() ([]*Service, error) { func (c *consulRegistry) ListServices() ([]*Service, error) {
c.mtx.RLock()
serviceMap := c.services
c.mtx.RUnlock()
var services []*Service
if len(serviceMap) > 0 {
for _, service := range serviceMap {
services = append(services, service...)
}
return services, nil
}
rsp, _, err := c.Client.Catalog().Services(&consul.QueryOptions{}) rsp, _, err := c.Client.Catalog().Services(&consul.QueryOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
var services []*Service
for service, _ := range rsp { for service, _ := range rsp {
services = append(services, &Service{Name: service}) services = append(services, &Service{Name: service})
} }

View File

@ -1,24 +1,33 @@
package registry package registry
import ( import (
"errors"
"sync"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch" "github.com/hashicorp/consul/watch"
) )
type consulWatcher struct { type consulWatcher struct {
Registry *consulRegistry r *consulRegistry
wp *watch.WatchPlan wp *watch.WatchPlan
watchers map[string]*watch.WatchPlan watchers map[string]*watch.WatchPlan
}
type serviceWatcher struct { once sync.Once
name string next chan *Result
sync.RWMutex
services map[string][]*Service
} }
func newConsulWatcher(cr *consulRegistry) (Watcher, error) { func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
var once sync.Once
cw := &consulWatcher{ cw := &consulWatcher{
Registry: cr, r: cr,
once: once,
next: make(chan *Result, 10),
watchers: make(map[string]*watch.WatchPlan), watchers: make(map[string]*watch.WatchPlan),
services: make(map[string][]*Service),
} }
wp, err := watch.Parse(map[string]interface{}{"type": "services"}) wp, err := watch.Parse(map[string]interface{}{"type": "services"})
@ -26,7 +35,7 @@ func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
return nil, err return nil, err
} }
wp.Handler = cw.Handle wp.Handler = cw.handle
go wp.Run(cr.Address) go wp.Run(cr.Address)
cw.wp = wp cw.wp = wp
@ -73,16 +82,80 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
}) })
} }
cw.Registry.mtx.Lock() cw.RLock()
var services []*Service rservices := cw.services
for _, service := range serviceMap { cw.RUnlock()
services = append(services, service)
var newServices []*Service
// serviceMap is the new set of services keyed by name+version
for _, newService := range serviceMap {
// append to the new set of cached services
newServices = append(newServices, newService)
// check if the service exists in the existing cache
oldServices, ok := rservices[serviceName]
if !ok {
// does not exist? then we're creating brand new entries
cw.next <- &Result{Action: "create", Service: newService}
continue
} }
cw.Registry.services[serviceName] = services
cw.Registry.mtx.Unlock() // service exists. ok let's figure out what to update and delete version wise
action := "create"
for _, oldService := range oldServices {
// does this version exist?
// no? then default to create
if oldService.Version != newService.Version {
continue
}
// yes? then it's an update
action = "update"
var nodes []*Node
// check the old nodes to see if they've been deleted
for _, oldNode := range oldService.Nodes {
var seen bool
for _, newNode := range newService.Nodes {
if newNode.Id == oldNode.Id {
seen = true
break
}
}
// does the old node exist in the new set of nodes
// no? then delete that shit
if !seen {
nodes = append(nodes, oldNode)
}
}
// it's an update rather than creation
if len(nodes) > 0 {
delService := oldService
delService.Nodes = nodes
cw.next <- &Result{Action: "delete", Service: delService}
}
}
cw.next <- &Result{Action: action, Service: newService}
}
// Now check old versions that may not be in new services map
for _, old := range rservices[serviceName] {
// old version does not exist in new version map
// kill it with fire!
if _, ok := serviceMap[serviceName+old.Version]; !ok {
cw.next <- &Result{Action: "delete", Service: old}
}
}
cw.Lock()
cw.services[serviceName] = newServices
cw.Unlock()
} }
func (cw *consulWatcher) Handle(idx uint64, data interface{}) { func (cw *consulWatcher) handle(idx uint64, data interface{}) {
services, ok := data.(map[string][]string) services, ok := data.(map[string][]string)
if !ok { if !ok {
return return
@ -99,21 +172,22 @@ func (cw *consulWatcher) Handle(idx uint64, data interface{}) {
}) })
if err == nil { if err == nil {
wp.Handler = cw.serviceHandler wp.Handler = cw.serviceHandler
go wp.Run(cw.Registry.Address) go wp.Run(cw.r.Address)
cw.watchers[service] = wp cw.watchers[service] = wp
cw.next <- &Result{Action: "create", Service: &Service{Name: service}}
} }
} }
cw.Registry.mtx.RLock() cw.RLock()
rservices := cw.Registry.services rservices := cw.services
cw.Registry.mtx.RUnlock() cw.RUnlock()
// remove unknown services from registry // remove unknown services from registry
for service, _ := range rservices { for service, _ := range rservices {
if _, ok := services[service]; !ok { if _, ok := services[service]; !ok {
cw.Registry.mtx.Lock() cw.Lock()
delete(cw.Registry.services, service) delete(cw.services, service)
cw.Registry.mtx.Unlock() cw.Unlock()
} }
} }
@ -122,13 +196,26 @@ func (cw *consulWatcher) Handle(idx uint64, data interface{}) {
if _, ok := services[service]; !ok { if _, ok := services[service]; !ok {
w.Stop() w.Stop()
delete(cw.watchers, service) delete(cw.watchers, service)
cw.next <- &Result{Action: "delete", Service: &Service{Name: service}}
} }
} }
} }
func (cw *consulWatcher) Next() (*Result, error) {
r, ok := <-cw.next
if !ok {
return nil, errors.New("chan closed")
}
return r, nil
}
func (cw *consulWatcher) Stop() { func (cw *consulWatcher) Stop() {
if cw.wp == nil { if cw.wp == nil {
return return
} }
cw.wp.Stop() cw.wp.Stop()
cw.once.Do(func() {
close(cw.next)
})
} }

View File

@ -8,10 +8,6 @@ type Registry interface {
Watch() (Watcher, error) Watch() (Watcher, error)
} }
type Watcher interface {
Stop()
}
type options struct{} type options struct{}
type Option func(*options) type Option func(*options)
@ -39,3 +35,7 @@ func GetService(name string) ([]*Service, error) {
func ListServices() ([]*Service, error) { func ListServices() ([]*Service, error) {
return DefaultRegistry.ListServices() return DefaultRegistry.ListServices()
} }
func Watch() (Watcher, error) {
return DefaultRegistry.Watch()
}

11
registry/watcher.go Normal file
View File

@ -0,0 +1,11 @@
package registry
type Watcher interface {
Next() (*Result, error)
Stop()
}
type Result struct {
Action string
Service *Service
}