262 lines
5.4 KiB
Go
262 lines
5.4 KiB
Go
|
package kubernetes
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
|
||
|
"github.com/micro/go-micro/registry"
|
||
|
"github.com/micro/go-micro/util/kubernetes/client"
|
||
|
"github.com/micro/go-micro/util/log"
|
||
|
)
|
||
|
|
||
|
type k8sWatcher struct {
|
||
|
registry *kregistry
|
||
|
watcher client.Watcher
|
||
|
next chan *registry.Result
|
||
|
stop chan bool
|
||
|
|
||
|
sync.RWMutex
|
||
|
pods map[string]*client.Pod
|
||
|
}
|
||
|
|
||
|
// build a cache of pods when the watcher starts.
|
||
|
func (k *k8sWatcher) updateCache() ([]*registry.Result, error) {
|
||
|
var pods client.PodList
|
||
|
|
||
|
if err := k.registry.client.Get(&client.Resource{
|
||
|
Kind: "pod",
|
||
|
Value: &pods,
|
||
|
}, podSelector); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
var results []*registry.Result
|
||
|
|
||
|
for _, pod := range pods.Items {
|
||
|
rslts := k.buildPodResults(&pod, nil)
|
||
|
|
||
|
for _, r := range rslts {
|
||
|
results = append(results, r)
|
||
|
}
|
||
|
|
||
|
k.Lock()
|
||
|
k.pods[pod.Metadata.Name] = &pod
|
||
|
k.Unlock()
|
||
|
}
|
||
|
|
||
|
return results, nil
|
||
|
}
|
||
|
|
||
|
// look through pod annotations, compare against cache if present
|
||
|
// and return a list of results to send down the wire.
|
||
|
func (k *k8sWatcher) buildPodResults(pod *client.Pod, cache *client.Pod) []*registry.Result {
|
||
|
var results []*registry.Result
|
||
|
ignore := make(map[string]bool)
|
||
|
|
||
|
if pod.Metadata != nil {
|
||
|
for ak, av := range pod.Metadata.Annotations {
|
||
|
// check this annotation kv is a service notation
|
||
|
if !strings.HasPrefix(ak, annotationPrefix) {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if len(av) == 0 {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// ignore when we check the cached annotations
|
||
|
// as we take care of it here
|
||
|
ignore[ak] = true
|
||
|
|
||
|
// compare aginst cache.
|
||
|
var cacheExists bool
|
||
|
var cav string
|
||
|
|
||
|
if cache != nil && cache.Metadata != nil {
|
||
|
cav, cacheExists = cache.Metadata.Annotations[ak]
|
||
|
if cacheExists && len(cav) > 0 && cav == av {
|
||
|
// service notation exists and is identical -
|
||
|
// no change result required.
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
|
||
|
rslt := ®istry.Result{}
|
||
|
if cacheExists {
|
||
|
rslt.Action = "update"
|
||
|
} else {
|
||
|
rslt.Action = "create"
|
||
|
}
|
||
|
|
||
|
// unmarshal service notation from annotation value
|
||
|
err := json.Unmarshal([]byte(av), &rslt.Service)
|
||
|
if err != nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
results = append(results, rslt)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// loop through cache annotations to find services
|
||
|
// not accounted for above, and "delete" them.
|
||
|
if cache != nil && cache.Metadata != nil {
|
||
|
for ak, av := range cache.Metadata.Annotations {
|
||
|
if ignore[ak] {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// check this annotation kv is a service notation
|
||
|
if !strings.HasPrefix(ak, annotationPrefix) {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
rslt := ®istry.Result{Action: "delete"}
|
||
|
// unmarshal service notation from annotation value
|
||
|
err := json.Unmarshal([]byte(av), &rslt.Service)
|
||
|
if err != nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
results = append(results, rslt)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return results
|
||
|
}
|
||
|
|
||
|
// handleEvent will taken an event from the k8s pods API and do the correct
|
||
|
// things with the result, based on the local cache.
|
||
|
func (k *k8sWatcher) handleEvent(event client.Event) {
|
||
|
var pod client.Pod
|
||
|
if err := json.Unmarshal([]byte(event.Object), &pod); err != nil {
|
||
|
log.Log("K8s Watcher: Couldnt unmarshal event object from pod")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
switch event.Type {
|
||
|
case client.Modified:
|
||
|
// Pod was modified
|
||
|
|
||
|
k.RLock()
|
||
|
cache := k.pods[pod.Metadata.Name]
|
||
|
k.RUnlock()
|
||
|
|
||
|
// service could have been added, edited or removed.
|
||
|
var results []*registry.Result
|
||
|
|
||
|
if pod.Status.Phase == podRunning {
|
||
|
results = k.buildPodResults(&pod, cache)
|
||
|
} else {
|
||
|
// passing in cache might not return all results
|
||
|
results = k.buildPodResults(&pod, nil)
|
||
|
}
|
||
|
|
||
|
for _, result := range results {
|
||
|
// pod isnt running
|
||
|
if pod.Status.Phase != podRunning {
|
||
|
result.Action = "delete"
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case k.next <- result:
|
||
|
case <-k.stop:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
k.Lock()
|
||
|
k.pods[pod.Metadata.Name] = &pod
|
||
|
k.Unlock()
|
||
|
return
|
||
|
|
||
|
case client.Deleted:
|
||
|
// Pod was deleted
|
||
|
// passing in cache might not return all results
|
||
|
results := k.buildPodResults(&pod, nil)
|
||
|
|
||
|
for _, result := range results {
|
||
|
result.Action = "delete"
|
||
|
select {
|
||
|
case k.next <- result:
|
||
|
case <-k.stop:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
k.Lock()
|
||
|
delete(k.pods, pod.Metadata.Name)
|
||
|
k.Unlock()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
// Next will block until a new result comes in
|
||
|
func (k *k8sWatcher) Next() (*registry.Result, error) {
|
||
|
select {
|
||
|
case r := <-k.next:
|
||
|
return r, nil
|
||
|
case <-k.stop:
|
||
|
return nil, errors.New("watcher stopped")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Stop will cancel any requests, and close channels
|
||
|
func (k *k8sWatcher) Stop() {
|
||
|
select {
|
||
|
case <-k.stop:
|
||
|
return
|
||
|
default:
|
||
|
k.watcher.Stop()
|
||
|
close(k.stop)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func newWatcher(kr *kregistry, opts ...registry.WatchOption) (registry.Watcher, error) {
|
||
|
var wo registry.WatchOptions
|
||
|
for _, o := range opts {
|
||
|
o(&wo)
|
||
|
}
|
||
|
|
||
|
selector := podSelector
|
||
|
if len(wo.Service) > 0 {
|
||
|
selector = map[string]string{
|
||
|
servicePrefix + serviceName(wo.Service): serviceValue,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Create watch request
|
||
|
watcher, err := kr.client.Watch(&client.Resource{
|
||
|
Kind: "pod",
|
||
|
}, client.WatchParams(selector))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
k := &k8sWatcher{
|
||
|
registry: kr,
|
||
|
watcher: watcher,
|
||
|
next: make(chan *registry.Result),
|
||
|
stop: make(chan bool),
|
||
|
pods: make(map[string]*client.Pod),
|
||
|
}
|
||
|
|
||
|
// update cache, but dont emit changes
|
||
|
if _, err := k.updateCache(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// range over watch request changes, and invoke
|
||
|
// the update event
|
||
|
go func() {
|
||
|
for event := range watcher.Chan() {
|
||
|
k.handleEvent(event)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return k, nil
|
||
|
}
|