diff --git a/registry/kubernetes_registry.go b/registry/kubernetes_registry.go index a6fa520a..4181c219 100644 --- a/registry/kubernetes_registry.go +++ b/registry/kubernetes_registry.go @@ -3,6 +3,7 @@ package registry import ( "fmt" "os" + "sync" k8s "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -11,6 +12,13 @@ import ( type KubernetesRegistry struct { Client *k8s.Client Namespace string + + mtx sync.RWMutex + services map[string]Service +} + +func (c *KubernetesRegistry) Watch() { + NewKubernetesWatcher(c) } func (c *KubernetesRegistry) Deregister(s Service) error { @@ -22,6 +30,14 @@ func (c *KubernetesRegistry) Register(s Service) error { } func (c *KubernetesRegistry) GetService(name string) (Service, error) { + c.mtx.RLock() + service, ok := c.services[name] + c.mtx.RUnlock() + + if ok { + return service, nil + } + services, err := c.Client.Services(c.Namespace).List(labels.OneTermEqualSelector("name", name)) if err != nil { return nil, err @@ -70,8 +86,13 @@ func NewKubernetesRegistry() Registry { Host: "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), }) - return &KubernetesRegistry{ + kr := &KubernetesRegistry{ Client: client, Namespace: "default", + services: make(map[string]Service), } + + kr.Watch() + + return kr } diff --git a/registry/kubernetes_watcher.go b/registry/kubernetes_watcher.go new file mode 100644 index 00000000..2804c114 --- /dev/null +++ b/registry/kubernetes_watcher.go @@ -0,0 +1,72 @@ +package registry + +import ( + "fmt" + "net" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +type KubernetesWatcher struct { + Registry *KubernetesRegistry +} + +func (k *KubernetesWatcher) OnUpdate(services []api.Service) { + fmt.Println("got update") + activeServices := util.StringSet{} + for _, service := range services { + fmt.Printf("%#v\n", service.ObjectMeta) + name, exists := service.ObjectMeta.Labels["name"] + if !exists { + continue + } + + activeServices.Insert(name) + serviceIP := net.ParseIP(service.Spec.PortalIP) + + ks := &KubernetesService{ + ServiceName: name, + ServiceNodes: []*KubernetesNode{ + &KubernetesNode{ + NodeAddress: serviceIP.String(), + NodePort: service.Spec.Port, + }, + }, + } + + k.Registry.mtx.Lock() + k.Registry.services[name] = ks + k.Registry.mtx.Unlock() + } + + k.Registry.mtx.Lock() + defer k.Registry.mtx.Unlock() + for name, _ := range k.Registry.services { + if !activeServices.Has(name) { + delete(k.Registry.services, name) + } + } +} + +func NewKubernetesWatcher(kr *KubernetesRegistry) *KubernetesWatcher { + serviceConfig := config.NewServiceConfig() + endpointsConfig := config.NewEndpointsConfig() + + config.NewSourceAPI( + kr.Client.Services(api.NamespaceAll), + kr.Client.Endpoints(api.NamespaceAll), + time.Second*10, + serviceConfig.Channel("api"), + endpointsConfig.Channel("api"), + ) + + ks := &KubernetesWatcher{ + Registry: kr, + } + + serviceConfig.RegisterHandler(ks) + return ks +}