Kubernetes Registry (#1064)
* add teh k8s registry * add k8s reg config/cmd * go mod update
This commit is contained in:
		
							
								
								
									
										261
									
								
								registry/kubernetes/watcher.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										261
									
								
								registry/kubernetes/watcher.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,261 @@ | ||||
| package kubernetes | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/util/kubernetes/client" | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| ) | ||||
|  | ||||
| type k8sWatcher struct { | ||||
| 	registry *kregistry | ||||
| 	watcher  client.Watcher | ||||
| 	next     chan *registry.Result | ||||
| 	stop     chan bool | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	pods map[string]*client.Pod | ||||
| } | ||||
|  | ||||
| // build a cache of pods when the watcher starts. | ||||
| func (k *k8sWatcher) updateCache() ([]*registry.Result, error) { | ||||
| 	var pods client.PodList | ||||
|  | ||||
| 	if err := k.registry.client.Get(&client.Resource{ | ||||
| 		Kind:  "pod", | ||||
| 		Value: &pods, | ||||
| 	}, podSelector); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	var results []*registry.Result | ||||
|  | ||||
| 	for _, pod := range pods.Items { | ||||
| 		rslts := k.buildPodResults(&pod, nil) | ||||
|  | ||||
| 		for _, r := range rslts { | ||||
| 			results = append(results, r) | ||||
| 		} | ||||
|  | ||||
| 		k.Lock() | ||||
| 		k.pods[pod.Metadata.Name] = &pod | ||||
| 		k.Unlock() | ||||
| 	} | ||||
|  | ||||
| 	return results, nil | ||||
| } | ||||
|  | ||||
| // look through pod annotations, compare against cache if present | ||||
| // and return a list of results to send down the wire. | ||||
| func (k *k8sWatcher) buildPodResults(pod *client.Pod, cache *client.Pod) []*registry.Result { | ||||
| 	var results []*registry.Result | ||||
| 	ignore := make(map[string]bool) | ||||
|  | ||||
| 	if pod.Metadata != nil { | ||||
| 		for ak, av := range pod.Metadata.Annotations { | ||||
| 			// check this annotation kv is a service notation | ||||
| 			if !strings.HasPrefix(ak, annotationPrefix) { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			if len(av) == 0 { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// ignore when we check the cached annotations | ||||
| 			// as we take care of it here | ||||
| 			ignore[ak] = true | ||||
|  | ||||
| 			// compare aginst cache. | ||||
| 			var cacheExists bool | ||||
| 			var cav string | ||||
|  | ||||
| 			if cache != nil && cache.Metadata != nil { | ||||
| 				cav, cacheExists = cache.Metadata.Annotations[ak] | ||||
| 				if cacheExists && len(cav) > 0 && cav == av { | ||||
| 					// service notation exists and is identical - | ||||
| 					// no change result required. | ||||
| 					continue | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			rslt := ®istry.Result{} | ||||
| 			if cacheExists { | ||||
| 				rslt.Action = "update" | ||||
| 			} else { | ||||
| 				rslt.Action = "create" | ||||
| 			} | ||||
|  | ||||
| 			// unmarshal service notation from annotation value | ||||
| 			err := json.Unmarshal([]byte(av), &rslt.Service) | ||||
| 			if err != nil { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			results = append(results, rslt) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// loop through cache annotations to find services | ||||
| 	// not accounted for above, and "delete" them. | ||||
| 	if cache != nil && cache.Metadata != nil { | ||||
| 		for ak, av := range cache.Metadata.Annotations { | ||||
| 			if ignore[ak] { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			// check this annotation kv is a service notation | ||||
| 			if !strings.HasPrefix(ak, annotationPrefix) { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			rslt := ®istry.Result{Action: "delete"} | ||||
| 			// unmarshal service notation from annotation value | ||||
| 			err := json.Unmarshal([]byte(av), &rslt.Service) | ||||
| 			if err != nil { | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			results = append(results, rslt) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return results | ||||
| } | ||||
|  | ||||
| // handleEvent will taken an event from the k8s pods API and do the correct | ||||
| // things with the result, based on the local cache. | ||||
| func (k *k8sWatcher) handleEvent(event client.Event) { | ||||
| 	var pod client.Pod | ||||
| 	if err := json.Unmarshal([]byte(event.Object), &pod); err != nil { | ||||
| 		log.Log("K8s Watcher: Couldnt unmarshal event object from pod") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	switch event.Type { | ||||
| 	case client.Modified: | ||||
| 		// Pod was modified | ||||
|  | ||||
| 		k.RLock() | ||||
| 		cache := k.pods[pod.Metadata.Name] | ||||
| 		k.RUnlock() | ||||
|  | ||||
| 		// service could have been added, edited or removed. | ||||
| 		var results []*registry.Result | ||||
|  | ||||
| 		if pod.Status.Phase == podRunning { | ||||
| 			results = k.buildPodResults(&pod, cache) | ||||
| 		} else { | ||||
| 			// passing in cache might not return all results | ||||
| 			results = k.buildPodResults(&pod, nil) | ||||
| 		} | ||||
|  | ||||
| 		for _, result := range results { | ||||
| 			// pod isnt running | ||||
| 			if pod.Status.Phase != podRunning { | ||||
| 				result.Action = "delete" | ||||
| 			} | ||||
|  | ||||
| 			select { | ||||
| 			case k.next <- result: | ||||
| 			case <-k.stop: | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		k.Lock() | ||||
| 		k.pods[pod.Metadata.Name] = &pod | ||||
| 		k.Unlock() | ||||
| 		return | ||||
|  | ||||
| 	case client.Deleted: | ||||
| 		// Pod was deleted | ||||
| 		// passing in cache might not return all results | ||||
| 		results := k.buildPodResults(&pod, nil) | ||||
|  | ||||
| 		for _, result := range results { | ||||
| 			result.Action = "delete" | ||||
| 			select { | ||||
| 			case k.next <- result: | ||||
| 			case <-k.stop: | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		k.Lock() | ||||
| 		delete(k.pods, pod.Metadata.Name) | ||||
| 		k.Unlock() | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| // Next will block until a new result comes in | ||||
| func (k *k8sWatcher) Next() (*registry.Result, error) { | ||||
| 	select { | ||||
| 	case r := <-k.next: | ||||
| 		return r, nil | ||||
| 	case <-k.stop: | ||||
| 		return nil, errors.New("watcher stopped") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Stop will cancel any requests, and close channels | ||||
| func (k *k8sWatcher) Stop() { | ||||
| 	select { | ||||
| 	case <-k.stop: | ||||
| 		return | ||||
| 	default: | ||||
| 		k.watcher.Stop() | ||||
| 		close(k.stop) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newWatcher(kr *kregistry, opts ...registry.WatchOption) (registry.Watcher, error) { | ||||
| 	var wo registry.WatchOptions | ||||
| 	for _, o := range opts { | ||||
| 		o(&wo) | ||||
| 	} | ||||
|  | ||||
| 	selector := podSelector | ||||
| 	if len(wo.Service) > 0 { | ||||
| 		selector = map[string]string{ | ||||
| 			servicePrefix + serviceName(wo.Service): serviceValue, | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Create watch request | ||||
| 	watcher, err := kr.client.Watch(&client.Resource{ | ||||
| 		Kind: "pod", | ||||
| 	}, client.WatchParams(selector)) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	k := &k8sWatcher{ | ||||
| 		registry: kr, | ||||
| 		watcher:  watcher, | ||||
| 		next:     make(chan *registry.Result), | ||||
| 		stop:     make(chan bool), | ||||
| 		pods:     make(map[string]*client.Pod), | ||||
| 	} | ||||
|  | ||||
| 	// update cache, but dont emit changes | ||||
| 	if _, err := k.updateCache(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// range over watch request changes, and invoke | ||||
| 	// the update event | ||||
| 	go func() { | ||||
| 		for event := range watcher.Chan() { | ||||
| 			k.handleEvent(event) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	return k, nil | ||||
| } | ||||
		Reference in New Issue
	
	Block a user