package consul import ( "fmt" "log" "os" "sync" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" "github.com/micro/go-micro/registry" ) type consulWatcher struct { r *consulRegistry wo registry.WatchOptions wp *watch.Plan watchers map[string]*watch.Plan next chan *registry.Result exit chan bool sync.RWMutex services map[string][]*registry.Service } func newConsulWatcher(cr *consulRegistry, opts ...registry.WatchOption) (registry.Watcher, error) { var wo registry.WatchOptions for _, o := range opts { o(&wo) } cw := &consulWatcher{ r: cr, wo: wo, exit: make(chan bool), next: make(chan *registry.Result, 10), watchers: make(map[string]*watch.Plan), services: make(map[string][]*registry.Service), } wp, err := watch.Parse(map[string]interface{}{"type": "services"}) if err != nil { return nil, err } wp.Handler = cw.handle go wp.RunWithClientAndLogger(cr.Client(), log.New(os.Stderr, "", log.LstdFlags)) cw.wp = wp return cw, nil } func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) { entries, ok := data.([]*api.ServiceEntry) if !ok { return } serviceMap := map[string]*registry.Service{} serviceName := "" for _, e := range entries { serviceName = e.Service.Service // version is now a tag version, _ := decodeVersion(e.Service.Tags) // service ID is now the node id id := e.Service.ID // key is always the version key := version // address is service address address := e.Service.Address // use node address if len(address) == 0 { address = e.Node.Address } svc, ok := serviceMap[key] if !ok { svc = ®istry.Service{ Endpoints: decodeEndpoints(e.Service.Tags), Name: e.Service.Service, Version: version, } serviceMap[key] = svc } var del bool for _, check := range e.Checks { // delete the node if the status is critical if check.Status == "critical" { del = true break } } // if delete then skip the node if del { continue } svc.Nodes = append(svc.Nodes, ®istry.Node{ Id: id, Address: fmt.Sprintf("%s:%d", address, e.Service.Port), Metadata: decodeMetadata(e.Service.Tags), }) } cw.RLock() // make a copy rservices := make(map[string][]*registry.Service) for k, v := range cw.services { rservices[k] = v } cw.RUnlock() var newServices []*registry.Service // serviceMap is the new set of services keyed by name+version for _, newService := range serviceMap { // append to the new set of cached services newServices = append(newServices, newService) // check if the service exists in the existing cache oldServices, ok := rservices[serviceName] if !ok { // does not exist? then we're creating brand new entries cw.next <- ®istry.Result{Action: "create", Service: newService} continue } // service exists. ok let's figure out what to update and delete version wise action := "create" for _, oldService := range oldServices { // does this version exist? // no? then default to create if oldService.Version != newService.Version { continue } // yes? then it's an update action = "update" var nodes []*registry.Node // check the old nodes to see if they've been deleted for _, oldNode := range oldService.Nodes { var seen bool for _, newNode := range newService.Nodes { if newNode.Id == oldNode.Id { seen = true break } } // does the old node exist in the new set of nodes // no? then delete that shit if !seen { nodes = append(nodes, oldNode) } } // it's an update rather than creation if len(nodes) > 0 { delService := oldService delService.Nodes = nodes cw.next <- ®istry.Result{Action: "delete", Service: delService} } } cw.next <- ®istry.Result{Action: action, Service: newService} } // Now check old versions that may not be in new services map for _, old := range rservices[serviceName] { // old version does not exist in new version map // kill it with fire! if _, ok := serviceMap[old.Version]; !ok { cw.next <- ®istry.Result{Action: "delete", Service: old} } } cw.Lock() cw.services[serviceName] = newServices cw.Unlock() } func (cw *consulWatcher) handle(idx uint64, data interface{}) { services, ok := data.(map[string][]string) if !ok { return } // add new watchers for service, _ := range services { // Filter on watch options // wo.Service: Only watch services we care about if len(cw.wo.Service) > 0 && service != cw.wo.Service { continue } if _, ok := cw.watchers[service]; ok { continue } wp, err := watch.Parse(map[string]interface{}{ "type": "service", "service": service, }) if err == nil { wp.Handler = cw.serviceHandler go wp.RunWithClientAndLogger(cw.r.Client(), log.New(os.Stderr, "", log.LstdFlags)) cw.watchers[service] = wp cw.next <- ®istry.Result{Action: "create", Service: ®istry.Service{Name: service}} } } cw.RLock() // make a copy rservices := make(map[string][]*registry.Service) for k, v := range cw.services { rservices[k] = v } cw.RUnlock() // remove unknown services from registry // save the things we want to delete deleted := make(map[string][]*registry.Service) for service, _ := range rservices { if _, ok := services[service]; !ok { cw.Lock() // save this before deleting deleted[service] = cw.services[service] delete(cw.services, service) cw.Unlock() } } // remove unknown services from watchers for service, w := range cw.watchers { if _, ok := services[service]; !ok { w.Stop() delete(cw.watchers, service) for _, oldService := range deleted[service] { // send a delete for the service nodes that we're removing cw.next <- ®istry.Result{Action: "delete", Service: oldService} } // sent the empty list as the last resort to indicate to delete the entire service cw.next <- ®istry.Result{Action: "delete", Service: ®istry.Service{Name: service}} } } } func (cw *consulWatcher) Next() (*registry.Result, error) { select { case <-cw.exit: return nil, registry.ErrWatcherStopped case r, ok := <-cw.next: if !ok { return nil, registry.ErrWatcherStopped } return r, nil } // NOTE: This is a dead code path: e.g. it will never be reached // as we return in all previous code paths never leading to this return return nil, registry.ErrWatcherStopped } func (cw *consulWatcher) Stop() { select { case <-cw.exit: return default: close(cw.exit) if cw.wp == nil { return } cw.wp.Stop() // drain results for { select { case <-cw.next: default: return } } } }