micro/registry/consul/watcher.go

278 lines
5.9 KiB
Go
Raw Normal View History

2019-01-15 16:50:37 +00:00
package consul
2015-02-14 23:00:47 +00:00
import (
2015-12-05 01:12:29 +00:00
"errors"
"sync"
2015-02-14 23:00:47 +00:00
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
2019-01-15 16:50:37 +00:00
"github.com/micro/go-micro/registry"
2015-02-14 23:00:47 +00:00
)
type consulWatcher struct {
2015-12-05 01:12:29 +00:00
r *consulRegistry
2019-01-15 16:50:37 +00:00
wo registry.WatchOptions
wp *watch.Plan
watchers map[string]*watch.Plan
2015-02-14 23:00:47 +00:00
2019-01-15 16:50:37 +00:00
next chan *registry.Result
exit chan bool
2015-12-05 01:12:29 +00:00
sync.RWMutex
2019-01-15 16:50:37 +00:00
services map[string][]*registry.Service
2015-02-14 23:00:47 +00:00
}
2019-01-15 16:50:37 +00:00
func newConsulWatcher(cr *consulRegistry, opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
2018-02-19 17:12:37 +00:00
for _, o := range opts {
o(&wo)
}
cw := &consulWatcher{
2015-12-05 01:12:29 +00:00
r: cr,
2018-02-19 17:12:37 +00:00
wo: wo,
exit: make(chan bool),
2019-01-15 16:50:37 +00:00
next: make(chan *registry.Result, 10),
watchers: make(map[string]*watch.Plan),
2019-01-15 16:50:37 +00:00
services: make(map[string][]*registry.Service),
}
wp, err := watch.Parse(map[string]interface{}{"type": "services"})
if err != nil {
return nil, err
}
2015-12-05 01:12:29 +00:00
wp.Handler = cw.handle
go wp.Run(cr.Address)
cw.wp = wp
return cw, nil
}
func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
2015-02-14 23:00:47 +00:00
entries, ok := data.([]*api.ServiceEntry)
if !ok {
return
}
2019-01-15 16:50:37 +00:00
serviceMap := map[string]*registry.Service{}
2015-11-08 01:48:48 +00:00
serviceName := ""
2015-02-14 23:00:47 +00:00
for _, e := range entries {
2015-11-08 01:48:48 +00:00
serviceName = e.Service.Service
2016-01-26 21:10:27 +00:00
// version is now a tag
2018-03-01 17:35:13 +00:00
version, _ := decodeVersion(e.Service.Tags)
2016-01-26 21:10:27 +00:00
// service ID is now the node id
id := e.Service.ID
// key is always the version
key := version
// address is service address
address := e.Service.Address
2018-03-01 17:35:13 +00:00
// use node address
if len(address) == 0 {
address = e.Node.Address
2015-11-08 01:48:48 +00:00
}
svc, ok := serviceMap[key]
if !ok {
2019-01-15 16:50:37 +00:00
svc = &registry.Service{
2015-11-08 01:48:48 +00:00
Endpoints: decodeEndpoints(e.Service.Tags),
Name: e.Service.Service,
Version: version,
}
serviceMap[key] = svc
}
var del bool
for _, check := range e.Checks {
// delete the node if the status is critical
if check.Status == "critical" {
del = true
break
}
}
// if delete then skip the node
if del {
continue
}
2019-01-15 16:50:37 +00:00
svc.Nodes = append(svc.Nodes, &registry.Node{
2015-11-08 01:48:48 +00:00
Id: id,
2016-01-26 21:10:27 +00:00
Address: address,
Port: e.Service.Port,
2015-05-26 22:39:48 +01:00
Metadata: decodeMetadata(e.Service.Tags),
2015-02-14 23:00:47 +00:00
})
}
2015-12-05 01:12:29 +00:00
cw.RLock()
2016-08-24 18:27:15 +01:00
// make a copy
2019-01-15 16:50:37 +00:00
rservices := make(map[string][]*registry.Service)
2016-08-24 18:27:15 +01:00
for k, v := range cw.services {
rservices[k] = v
}
2015-12-05 01:12:29 +00:00
cw.RUnlock()
2019-01-15 16:50:37 +00:00
var newServices []*registry.Service
2015-12-05 01:12:29 +00:00
// serviceMap is the new set of services keyed by name+version
for _, newService := range serviceMap {
// append to the new set of cached services
newServices = append(newServices, newService)
// check if the service exists in the existing cache
oldServices, ok := rservices[serviceName]
if !ok {
// does not exist? then we're creating brand new entries
2019-01-15 16:50:37 +00:00
cw.next <- &registry.Result{Action: "create", Service: newService}
2015-12-05 01:12:29 +00:00
continue
}
// service exists. ok let's figure out what to update and delete version wise
action := "create"
for _, oldService := range oldServices {
// does this version exist?
// no? then default to create
if oldService.Version != newService.Version {
continue
}
// yes? then it's an update
action = "update"
2019-01-15 16:50:37 +00:00
var nodes []*registry.Node
2015-12-05 01:12:29 +00:00
// check the old nodes to see if they've been deleted
for _, oldNode := range oldService.Nodes {
var seen bool
for _, newNode := range newService.Nodes {
if newNode.Id == oldNode.Id {
seen = true
break
}
}
// does the old node exist in the new set of nodes
// no? then delete that shit
if !seen {
nodes = append(nodes, oldNode)
}
}
// it's an update rather than creation
if len(nodes) > 0 {
delService := oldService
delService.Nodes = nodes
2019-01-15 16:50:37 +00:00
cw.next <- &registry.Result{Action: "delete", Service: delService}
2015-12-05 01:12:29 +00:00
}
}
2019-01-15 16:50:37 +00:00
cw.next <- &registry.Result{Action: action, Service: newService}
2015-12-05 01:12:29 +00:00
}
// Now check old versions that may not be in new services map
for _, old := range rservices[serviceName] {
// old version does not exist in new version map
// kill it with fire!
if _, ok := serviceMap[old.Version]; !ok {
2019-01-15 16:50:37 +00:00
cw.next <- &registry.Result{Action: "delete", Service: old}
2015-12-05 01:12:29 +00:00
}
2015-11-08 01:48:48 +00:00
}
2015-12-05 01:12:29 +00:00
cw.Lock()
cw.services[serviceName] = newServices
cw.Unlock()
2015-02-14 23:00:47 +00:00
}
2015-12-05 01:12:29 +00:00
func (cw *consulWatcher) handle(idx uint64, data interface{}) {
2015-02-14 23:00:47 +00:00
services, ok := data.(map[string][]string)
if !ok {
return
}
// add new watchers
for service, _ := range services {
2018-02-19 17:12:37 +00:00
// Filter on watch options
// wo.Service: Only watch services we care about
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
continue
}
2015-02-14 23:00:47 +00:00
if _, ok := cw.watchers[service]; ok {
continue
}
wp, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": service,
})
if err == nil {
wp.Handler = cw.serviceHandler
2015-12-05 01:12:29 +00:00
go wp.Run(cw.r.Address)
2015-02-14 23:00:47 +00:00
cw.watchers[service] = wp
2019-01-15 16:50:37 +00:00
cw.next <- &registry.Result{Action: "create", Service: &registry.Service{Name: service}}
2015-02-14 23:00:47 +00:00
}
}
2015-12-05 01:12:29 +00:00
cw.RLock()
// make a copy
2019-01-15 16:50:37 +00:00
rservices := make(map[string][]*registry.Service)
for k, v := range cw.services {
rservices[k] = v
}
2015-12-05 01:12:29 +00:00
cw.RUnlock()
2015-02-14 23:00:47 +00:00
// remove unknown services from registry
for service, _ := range rservices {
if _, ok := services[service]; !ok {
2015-12-05 01:12:29 +00:00
cw.Lock()
delete(cw.services, service)
cw.Unlock()
2015-02-14 23:00:47 +00:00
}
}
// remove unknown services from watchers
for service, w := range cw.watchers {
if _, ok := services[service]; !ok {
w.Stop()
delete(cw.watchers, service)
2019-01-15 16:50:37 +00:00
cw.next <- &registry.Result{Action: "delete", Service: &registry.Service{Name: service}}
2015-02-14 23:00:47 +00:00
}
}
}
2019-01-15 16:50:37 +00:00
func (cw *consulWatcher) Next() (*registry.Result, error) {
select {
case <-cw.exit:
return nil, errors.New("result chan closed")
case r, ok := <-cw.next:
if !ok {
return nil, errors.New("result chan closed")
}
return r, nil
2015-12-05 01:12:29 +00:00
}
return nil, errors.New("result chan closed")
2015-12-05 01:12:29 +00:00
}
func (cw *consulWatcher) Stop() {
select {
case <-cw.exit:
2015-02-14 23:00:47 +00:00
return
default:
close(cw.exit)
if cw.wp == nil {
return
}
cw.wp.Stop()
// drain results
for {
select {
case <-cw.next:
default:
return
}
}
2015-02-14 23:00:47 +00:00
}
}