291 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			291 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package consul
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"log"
 | 
						|
	"os"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/hashicorp/consul/api"
 | 
						|
	"github.com/hashicorp/consul/api/watch"
 | 
						|
	"github.com/micro/go-micro/registry"
 | 
						|
)
 | 
						|
 | 
						|
type consulWatcher struct {
 | 
						|
	r        *consulRegistry
 | 
						|
	wo       registry.WatchOptions
 | 
						|
	wp       *watch.Plan
 | 
						|
	watchers map[string]*watch.Plan
 | 
						|
 | 
						|
	next chan *registry.Result
 | 
						|
	exit chan bool
 | 
						|
 | 
						|
	sync.RWMutex
 | 
						|
	services map[string][]*registry.Service
 | 
						|
}
 | 
						|
 | 
						|
func newConsulWatcher(cr *consulRegistry, opts ...registry.WatchOption) (registry.Watcher, error) {
 | 
						|
	var wo registry.WatchOptions
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&wo)
 | 
						|
	}
 | 
						|
 | 
						|
	cw := &consulWatcher{
 | 
						|
		r:        cr,
 | 
						|
		wo:       wo,
 | 
						|
		exit:     make(chan bool),
 | 
						|
		next:     make(chan *registry.Result, 10),
 | 
						|
		watchers: make(map[string]*watch.Plan),
 | 
						|
		services: make(map[string][]*registry.Service),
 | 
						|
	}
 | 
						|
 | 
						|
	wp, err := watch.Parse(map[string]interface{}{"type": "services"})
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	wp.Handler = cw.handle
 | 
						|
	go wp.RunWithClientAndLogger(cr.Client, log.New(os.Stderr, "", log.LstdFlags))
 | 
						|
	cw.wp = wp
 | 
						|
 | 
						|
	return cw, nil
 | 
						|
}
 | 
						|
 | 
						|
func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
 | 
						|
	entries, ok := data.([]*api.ServiceEntry)
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	serviceMap := map[string]*registry.Service{}
 | 
						|
	serviceName := ""
 | 
						|
 | 
						|
	for _, e := range entries {
 | 
						|
		serviceName = e.Service.Service
 | 
						|
		// version is now a tag
 | 
						|
		version, _ := decodeVersion(e.Service.Tags)
 | 
						|
		// service ID is now the node id
 | 
						|
		id := e.Service.ID
 | 
						|
		// key is always the version
 | 
						|
		key := version
 | 
						|
		// address is service address
 | 
						|
		address := e.Service.Address
 | 
						|
 | 
						|
		// use node address
 | 
						|
		if len(address) == 0 {
 | 
						|
			address = e.Node.Address
 | 
						|
		}
 | 
						|
 | 
						|
		svc, ok := serviceMap[key]
 | 
						|
		if !ok {
 | 
						|
			svc = ®istry.Service{
 | 
						|
				Endpoints: decodeEndpoints(e.Service.Tags),
 | 
						|
				Name:      e.Service.Service,
 | 
						|
				Version:   version,
 | 
						|
			}
 | 
						|
			serviceMap[key] = svc
 | 
						|
		}
 | 
						|
 | 
						|
		var del bool
 | 
						|
 | 
						|
		for _, check := range e.Checks {
 | 
						|
			// delete the node if the status is critical
 | 
						|
			if check.Status == "critical" {
 | 
						|
				del = true
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// if delete then skip the node
 | 
						|
		if del {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		svc.Nodes = append(svc.Nodes, ®istry.Node{
 | 
						|
			Id:       id,
 | 
						|
			Address:  fmt.Sprintf("%s:%d", address, e.Service.Port),
 | 
						|
			Metadata: decodeMetadata(e.Service.Tags),
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	cw.RLock()
 | 
						|
	// make a copy
 | 
						|
	rservices := make(map[string][]*registry.Service)
 | 
						|
	for k, v := range cw.services {
 | 
						|
		rservices[k] = v
 | 
						|
	}
 | 
						|
	cw.RUnlock()
 | 
						|
 | 
						|
	var newServices []*registry.Service
 | 
						|
 | 
						|
	// serviceMap is the new set of services keyed by name+version
 | 
						|
	for _, newService := range serviceMap {
 | 
						|
		// append to the new set of cached services
 | 
						|
		newServices = append(newServices, newService)
 | 
						|
 | 
						|
		// check if the service exists in the existing cache
 | 
						|
		oldServices, ok := rservices[serviceName]
 | 
						|
		if !ok {
 | 
						|
			// does not exist? then we're creating brand new entries
 | 
						|
			cw.next <- ®istry.Result{Action: "create", Service: newService}
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		// service exists. ok let's figure out what to update and delete version wise
 | 
						|
		action := "create"
 | 
						|
 | 
						|
		for _, oldService := range oldServices {
 | 
						|
			// does this version exist?
 | 
						|
			// no? then default to create
 | 
						|
			if oldService.Version != newService.Version {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			// yes? then it's an update
 | 
						|
			action = "update"
 | 
						|
 | 
						|
			var nodes []*registry.Node
 | 
						|
			// check the old nodes to see if they've been deleted
 | 
						|
			for _, oldNode := range oldService.Nodes {
 | 
						|
				var seen bool
 | 
						|
				for _, newNode := range newService.Nodes {
 | 
						|
					if newNode.Id == oldNode.Id {
 | 
						|
						seen = true
 | 
						|
						break
 | 
						|
					}
 | 
						|
				}
 | 
						|
				// does the old node exist in the new set of nodes
 | 
						|
				// no? then delete that shit
 | 
						|
				if !seen {
 | 
						|
					nodes = append(nodes, oldNode)
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			// it's an update rather than creation
 | 
						|
			if len(nodes) > 0 {
 | 
						|
				delService := oldService
 | 
						|
				delService.Nodes = nodes
 | 
						|
				cw.next <- ®istry.Result{Action: "delete", Service: delService}
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		cw.next <- ®istry.Result{Action: action, Service: newService}
 | 
						|
	}
 | 
						|
 | 
						|
	// Now check old versions that may not be in new services map
 | 
						|
	for _, old := range rservices[serviceName] {
 | 
						|
		// old version does not exist in new version map
 | 
						|
		// kill it with fire!
 | 
						|
		if _, ok := serviceMap[old.Version]; !ok {
 | 
						|
			cw.next <- ®istry.Result{Action: "delete", Service: old}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	cw.Lock()
 | 
						|
	cw.services[serviceName] = newServices
 | 
						|
	cw.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (cw *consulWatcher) handle(idx uint64, data interface{}) {
 | 
						|
	services, ok := data.(map[string][]string)
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// add new watchers
 | 
						|
	for service, _ := range services {
 | 
						|
		// Filter on watch options
 | 
						|
		// wo.Service: Only watch services we care about
 | 
						|
		if len(cw.wo.Service) > 0 && service != cw.wo.Service {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if _, ok := cw.watchers[service]; ok {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		wp, err := watch.Parse(map[string]interface{}{
 | 
						|
			"type":    "service",
 | 
						|
			"service": service,
 | 
						|
		})
 | 
						|
		if err == nil {
 | 
						|
			wp.Handler = cw.serviceHandler
 | 
						|
			go wp.RunWithClientAndLogger(cw.r.Client, log.New(os.Stderr, "", log.LstdFlags))
 | 
						|
			cw.watchers[service] = wp
 | 
						|
			cw.next <- ®istry.Result{Action: "create", Service: ®istry.Service{Name: service}}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	cw.RLock()
 | 
						|
	// make a copy
 | 
						|
	rservices := make(map[string][]*registry.Service)
 | 
						|
	for k, v := range cw.services {
 | 
						|
		rservices[k] = v
 | 
						|
	}
 | 
						|
	cw.RUnlock()
 | 
						|
 | 
						|
	// remove unknown services from registry
 | 
						|
	// save the things we want to delete
 | 
						|
	deleted := make(map[string][]*registry.Service)
 | 
						|
 | 
						|
	for service, _ := range rservices {
 | 
						|
		if _, ok := services[service]; !ok {
 | 
						|
			cw.Lock()
 | 
						|
			// save this before deleting
 | 
						|
			deleted[service] = cw.services[service]
 | 
						|
			delete(cw.services, service)
 | 
						|
			cw.Unlock()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// remove unknown services from watchers
 | 
						|
	for service, w := range cw.watchers {
 | 
						|
		if _, ok := services[service]; !ok {
 | 
						|
			w.Stop()
 | 
						|
			delete(cw.watchers, service)
 | 
						|
			for _, oldService := range deleted[service] {
 | 
						|
				// send a delete for the service nodes that we're removing
 | 
						|
				cw.next <- ®istry.Result{Action: "delete", Service: oldService}
 | 
						|
			}
 | 
						|
			// sent the empty list as the last resort to indicate to delete the entire service
 | 
						|
			cw.next <- ®istry.Result{Action: "delete", Service: ®istry.Service{Name: service}}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (cw *consulWatcher) Next() (*registry.Result, error) {
 | 
						|
	select {
 | 
						|
	case <-cw.exit:
 | 
						|
		return nil, registry.ErrWatcherStopped
 | 
						|
	case r, ok := <-cw.next:
 | 
						|
		if !ok {
 | 
						|
			return nil, registry.ErrWatcherStopped
 | 
						|
		}
 | 
						|
		return r, nil
 | 
						|
	}
 | 
						|
	// NOTE: This is a dead code path: e.g. it will never be reached
 | 
						|
	// as we return in all previous code paths never leading to this return
 | 
						|
	return nil, registry.ErrWatcherStopped
 | 
						|
}
 | 
						|
 | 
						|
func (cw *consulWatcher) Stop() {
 | 
						|
	select {
 | 
						|
	case <-cw.exit:
 | 
						|
		return
 | 
						|
	default:
 | 
						|
		close(cw.exit)
 | 
						|
		if cw.wp == nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		cw.wp.Stop()
 | 
						|
 | 
						|
		// drain results
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-cw.next:
 | 
						|
			default:
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |