Add consul watcher

This commit is contained in:
Asim 2015-02-14 23:00:47 +00:00
parent ed84c97a17
commit 4b494966fb
2 changed files with 136 additions and 11 deletions

View File

@ -2,17 +2,18 @@ package registry
import ( import (
"errors" "errors"
"sync"
consul "github.com/hashicorp/consul/api" consul "github.com/hashicorp/consul/api"
) )
type ConsulRegistry struct { type ConsulRegistry struct {
Address string
Client *consul.Client Client *consul.Client
}
var ( mtx sync.RWMutex
ConsulCheckTTL = "30s" services map[string]Service
) }
func (c *ConsulRegistry) Deregister(s Service) error { func (c *ConsulRegistry) Deregister(s Service) error {
if len(s.Nodes()) == 0 { if len(s.Nodes()) == 0 {
@ -51,6 +52,14 @@ func (c *ConsulRegistry) Register(s Service) error {
} }
func (c *ConsulRegistry) GetService(name string) (Service, error) { func (c *ConsulRegistry) GetService(name string) (Service, error) {
c.mtx.RLock()
service, ok := c.services[name]
c.mtx.RUnlock()
if ok {
return service, nil
}
rsp, _, err := c.Client.Catalog().Service(name, "", nil) rsp, _, err := c.Client.Catalog().Service(name, "", nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -99,10 +108,20 @@ func (c *ConsulRegistry) NewNode(id, address string, port int) Node {
} }
} }
func NewConsulRegistry() Registry { func (c *ConsulRegistry) Watch() {
client, _ := consul.NewClient(consul.DefaultConfig()) NewConsulWatcher(c)
}
return &ConsulRegistry{
Client: client, func NewConsulRegistry() Registry {
} config := consul.DefaultConfig()
client, _ := consul.NewClient(config)
cr := &ConsulRegistry{
Address: config.Address,
Client: client,
services: make(map[string]Service),
}
cr.Watch()
return cr
} }

106
registry/consul_watcher.go Normal file
View File

@ -0,0 +1,106 @@
package registry
import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
)
type ConsulWatcher struct {
Registry *ConsulRegistry
wp *watch.WatchPlan
watchers map[string]*watch.WatchPlan
}
type serviceWatcher struct {
name string
}
func (cw *ConsulWatcher) serviceHandler(idx uint64, data interface{}) {
entries, ok := data.([]*api.ServiceEntry)
if !ok {
return
}
cs := &ConsulService{}
for _, e := range entries {
cs.ServiceName = e.Service.Service
cs.ServiceNodes = append(cs.ServiceNodes, &ConsulNode{
Node: e.Node.Node,
NodeId: e.Service.ID,
NodeAddress: e.Node.Address,
NodePort: e.Service.Port,
})
}
cw.Registry.mtx.Lock()
cw.Registry.services[cs.ServiceName] = cs
cw.Registry.mtx.Unlock()
}
func (cw *ConsulWatcher) Handle(idx uint64, data interface{}) {
services, ok := data.(map[string][]string)
if !ok {
return
}
// add new watchers
for service, _ := range services {
if _, ok := cw.watchers[service]; ok {
continue
}
wp, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": service,
})
if err == nil {
wp.Handler = cw.serviceHandler
go wp.Run(cw.Registry.Address)
cw.watchers[service] = wp
}
}
cw.Registry.mtx.RLock()
rservices := cw.Registry.services
cw.Registry.mtx.RUnlock()
// remove unknown services from registry
for service, _ := range rservices {
if _, ok := services[service]; !ok {
cw.Registry.mtx.Lock()
delete(cw.Registry.services, service)
cw.Registry.mtx.Unlock()
}
}
// remove unknown services from watchers
for service, w := range cw.watchers {
if _, ok := services[service]; !ok {
w.Stop()
delete(cw.watchers, service)
}
}
}
func (cw *ConsulWatcher) Stop() {
if cw.wp == nil {
return
}
cw.wp.Stop()
}
func NewConsulWatcher(cr *ConsulRegistry) *ConsulWatcher {
cw := &ConsulWatcher{
Registry: cr,
watchers: make(map[string]*watch.WatchPlan),
}
wp, err := watch.Parse(map[string]interface{}{"type": "services"})
if err == nil {
wp.Handler = cw.Handle
go wp.Run(cr.Address)
cw.wp = wp
}
return cw
}