package kubernetes import ( "encoding/json" "errors" "strings" "sync" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/util/kubernetes/client" "github.com/micro/go-micro/util/log" ) type k8sWatcher struct { registry *kregistry watcher client.Watcher next chan *registry.Result stop chan bool sync.RWMutex pods map[string]*client.Pod } // build a cache of pods when the watcher starts. func (k *k8sWatcher) updateCache() ([]*registry.Result, error) { var pods client.PodList if err := k.registry.client.Get(&client.Resource{ Kind: "pod", Value: &pods, }, podSelector); err != nil { return nil, err } var results []*registry.Result for _, pod := range pods.Items { rslts := k.buildPodResults(&pod, nil) for _, r := range rslts { results = append(results, r) } k.Lock() k.pods[pod.Metadata.Name] = &pod k.Unlock() } return results, nil } // look through pod annotations, compare against cache if present // and return a list of results to send down the wire. func (k *k8sWatcher) buildPodResults(pod *client.Pod, cache *client.Pod) []*registry.Result { var results []*registry.Result ignore := make(map[string]bool) if pod.Metadata != nil { for ak, av := range pod.Metadata.Annotations { // check this annotation kv is a service notation if !strings.HasPrefix(ak, annotationPrefix) { continue } if len(av) == 0 { continue } // ignore when we check the cached annotations // as we take care of it here ignore[ak] = true // compare aginst cache. var cacheExists bool var cav string if cache != nil && cache.Metadata != nil { cav, cacheExists = cache.Metadata.Annotations[ak] if cacheExists && len(cav) > 0 && cav == av { // service notation exists and is identical - // no change result required. continue } } rslt := ®istry.Result{} if cacheExists { rslt.Action = "update" } else { rslt.Action = "create" } // unmarshal service notation from annotation value err := json.Unmarshal([]byte(av), &rslt.Service) if err != nil { continue } results = append(results, rslt) } } // loop through cache annotations to find services // not accounted for above, and "delete" them. if cache != nil && cache.Metadata != nil { for ak, av := range cache.Metadata.Annotations { if ignore[ak] { continue } // check this annotation kv is a service notation if !strings.HasPrefix(ak, annotationPrefix) { continue } rslt := ®istry.Result{Action: "delete"} // unmarshal service notation from annotation value err := json.Unmarshal([]byte(av), &rslt.Service) if err != nil { continue } results = append(results, rslt) } } return results } // handleEvent will taken an event from the k8s pods API and do the correct // things with the result, based on the local cache. func (k *k8sWatcher) handleEvent(event client.Event) { var pod client.Pod if err := json.Unmarshal([]byte(event.Object), &pod); err != nil { log.Log("K8s Watcher: Couldnt unmarshal event object from pod") return } switch event.Type { case client.Modified: // Pod was modified k.RLock() cache := k.pods[pod.Metadata.Name] k.RUnlock() // service could have been added, edited or removed. var results []*registry.Result if pod.Status.Phase == podRunning { results = k.buildPodResults(&pod, cache) } else { // passing in cache might not return all results results = k.buildPodResults(&pod, nil) } for _, result := range results { // pod isnt running if pod.Status.Phase != podRunning { result.Action = "delete" } select { case k.next <- result: case <-k.stop: return } } k.Lock() k.pods[pod.Metadata.Name] = &pod k.Unlock() return case client.Deleted: // Pod was deleted // passing in cache might not return all results results := k.buildPodResults(&pod, nil) for _, result := range results { result.Action = "delete" select { case k.next <- result: case <-k.stop: return } } k.Lock() delete(k.pods, pod.Metadata.Name) k.Unlock() return } } // Next will block until a new result comes in func (k *k8sWatcher) Next() (*registry.Result, error) { select { case r := <-k.next: return r, nil case <-k.stop: return nil, errors.New("watcher stopped") } } // Stop will cancel any requests, and close channels func (k *k8sWatcher) Stop() { select { case <-k.stop: return default: k.watcher.Stop() close(k.stop) } } func newWatcher(kr *kregistry, opts ...registry.WatchOption) (registry.Watcher, error) { var wo registry.WatchOptions for _, o := range opts { o(&wo) } selector := podSelector if len(wo.Service) > 0 { selector = map[string]string{ servicePrefix + serviceName(wo.Service): serviceValue, } } // Create watch request watcher, err := kr.client.Watch(&client.Resource{ Kind: "pod", }, client.WatchParams(selector)) if err != nil { return nil, err } k := &k8sWatcher{ registry: kr, watcher: watcher, next: make(chan *registry.Result), stop: make(chan bool), pods: make(map[string]*client.Pod), } // update cache, but dont emit changes if _, err := k.updateCache(); err != nil { return nil, err } // range over watch request changes, and invoke // the update event go func() { for event := range watcher.Chan() { k.handleEvent(event) } }() return k, nil }