From 4b494966fb3bbf7acc56e0664fdecbc1f39807f4 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 14 Feb 2015 23:00:47 +0000 Subject: [PATCH] Add consul watcher --- registry/consul_registry.go | 41 ++++++++++---- registry/consul_watcher.go | 106 ++++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 11 deletions(-) create mode 100644 registry/consul_watcher.go diff --git a/registry/consul_registry.go b/registry/consul_registry.go index ae0d6e96..b8f59a17 100644 --- a/registry/consul_registry.go +++ b/registry/consul_registry.go @@ -2,17 +2,18 @@ package registry import ( "errors" + "sync" consul "github.com/hashicorp/consul/api" ) type ConsulRegistry struct { - Client *consul.Client -} + Address string + Client *consul.Client -var ( - ConsulCheckTTL = "30s" -) + mtx sync.RWMutex + services map[string]Service +} func (c *ConsulRegistry) Deregister(s Service) error { if len(s.Nodes()) == 0 { @@ -51,6 +52,14 @@ func (c *ConsulRegistry) Register(s Service) error { } func (c *ConsulRegistry) GetService(name string) (Service, error) { + c.mtx.RLock() + service, ok := c.services[name] + c.mtx.RUnlock() + + if ok { + return service, nil + } + rsp, _, err := c.Client.Catalog().Service(name, "", nil) if err != nil { return nil, err @@ -99,10 +108,20 @@ func (c *ConsulRegistry) NewNode(id, address string, port int) Node { } } -func NewConsulRegistry() Registry { - client, _ := consul.NewClient(consul.DefaultConfig()) - - return &ConsulRegistry{ - Client: client, - } +func (c *ConsulRegistry) Watch() { + NewConsulWatcher(c) +} + +func NewConsulRegistry() Registry { + config := consul.DefaultConfig() + client, _ := consul.NewClient(config) + + cr := &ConsulRegistry{ + Address: config.Address, + Client: client, + services: make(map[string]Service), + } + + cr.Watch() + return cr } diff --git a/registry/consul_watcher.go b/registry/consul_watcher.go new file mode 100644 index 00000000..5510968d --- /dev/null +++ b/registry/consul_watcher.go @@ -0,0 +1,106 @@ +package registry + +import ( + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/watch" +) + +type ConsulWatcher struct { + Registry *ConsulRegistry + wp *watch.WatchPlan + watchers map[string]*watch.WatchPlan +} + +type serviceWatcher struct { + name string +} + +func (cw *ConsulWatcher) serviceHandler(idx uint64, data interface{}) { + entries, ok := data.([]*api.ServiceEntry) + if !ok { + return + } + + cs := &ConsulService{} + + for _, e := range entries { + cs.ServiceName = e.Service.Service + cs.ServiceNodes = append(cs.ServiceNodes, &ConsulNode{ + Node: e.Node.Node, + NodeId: e.Service.ID, + NodeAddress: e.Node.Address, + NodePort: e.Service.Port, + }) + } + + cw.Registry.mtx.Lock() + cw.Registry.services[cs.ServiceName] = cs + cw.Registry.mtx.Unlock() +} + +func (cw *ConsulWatcher) Handle(idx uint64, data interface{}) { + services, ok := data.(map[string][]string) + if !ok { + return + } + + // add new watchers + for service, _ := range services { + if _, ok := cw.watchers[service]; ok { + continue + } + wp, err := watch.Parse(map[string]interface{}{ + "type": "service", + "service": service, + }) + if err == nil { + wp.Handler = cw.serviceHandler + go wp.Run(cw.Registry.Address) + cw.watchers[service] = wp + } + } + + cw.Registry.mtx.RLock() + rservices := cw.Registry.services + cw.Registry.mtx.RUnlock() + + // remove unknown services from registry + for service, _ := range rservices { + if _, ok := services[service]; !ok { + cw.Registry.mtx.Lock() + delete(cw.Registry.services, service) + cw.Registry.mtx.Unlock() + } + } + + // remove unknown services from watchers + for service, w := range cw.watchers { + if _, ok := services[service]; !ok { + w.Stop() + delete(cw.watchers, service) + } + } +} + +func (cw *ConsulWatcher) Stop() { + if cw.wp == nil { + return + } + cw.wp.Stop() +} + +func NewConsulWatcher(cr *ConsulRegistry) *ConsulWatcher { + cw := &ConsulWatcher{ + Registry: cr, + watchers: make(map[string]*watch.WatchPlan), + } + + wp, err := watch.Parse(map[string]interface{}{"type": "services"}) + if err == nil { + wp.Handler = cw.Handle + go wp.Run(cr.Address) + cw.wp = wp + } + + return cw +}