Add watcher and cache for kubernetes registry
This commit is contained in:
		| @@ -3,6 +3,7 @@ package registry | |||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"os" | 	"os" | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
| 	k8s "github.com/GoogleCloudPlatform/kubernetes/pkg/client" | 	k8s "github.com/GoogleCloudPlatform/kubernetes/pkg/client" | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" | ||||||
| @@ -11,6 +12,13 @@ import ( | |||||||
| type KubernetesRegistry struct { | type KubernetesRegistry struct { | ||||||
| 	Client    *k8s.Client | 	Client    *k8s.Client | ||||||
| 	Namespace string | 	Namespace string | ||||||
|  |  | ||||||
|  | 	mtx      sync.RWMutex | ||||||
|  | 	services map[string]Service | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *KubernetesRegistry) Watch() { | ||||||
|  | 	NewKubernetesWatcher(c) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *KubernetesRegistry) Deregister(s Service) error { | func (c *KubernetesRegistry) Deregister(s Service) error { | ||||||
| @@ -22,6 +30,14 @@ func (c *KubernetesRegistry) Register(s Service) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (c *KubernetesRegistry) GetService(name string) (Service, error) { | func (c *KubernetesRegistry) GetService(name string) (Service, error) { | ||||||
|  | 	c.mtx.RLock() | ||||||
|  | 	service, ok := c.services[name] | ||||||
|  | 	c.mtx.RUnlock() | ||||||
|  |  | ||||||
|  | 	if ok { | ||||||
|  | 		return service, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	services, err := c.Client.Services(c.Namespace).List(labels.OneTermEqualSelector("name", name)) | 	services, err := c.Client.Services(c.Namespace).List(labels.OneTermEqualSelector("name", name)) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @@ -70,8 +86,13 @@ func NewKubernetesRegistry() Registry { | |||||||
| 		Host: "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), | 		Host: "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), | ||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
| 	return &KubernetesRegistry{ | 	kr := &KubernetesRegistry{ | ||||||
| 		Client:    client, | 		Client:    client, | ||||||
| 		Namespace: "default", | 		Namespace: "default", | ||||||
|  | 		services:  make(map[string]Service), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	kr.Watch() | ||||||
|  |  | ||||||
|  | 	return kr | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										72
									
								
								registry/kubernetes_watcher.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								registry/kubernetes_watcher.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,72 @@ | |||||||
|  | package registry | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"net" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" | ||||||
|  | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type KubernetesWatcher struct { | ||||||
|  | 	Registry *KubernetesRegistry | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (k *KubernetesWatcher) OnUpdate(services []api.Service) { | ||||||
|  | 	fmt.Println("got update") | ||||||
|  | 	activeServices := util.StringSet{} | ||||||
|  | 	for _, service := range services { | ||||||
|  | 		fmt.Printf("%#v\n", service.ObjectMeta) | ||||||
|  | 		name, exists := service.ObjectMeta.Labels["name"] | ||||||
|  | 		if !exists { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		activeServices.Insert(name) | ||||||
|  | 		serviceIP := net.ParseIP(service.Spec.PortalIP) | ||||||
|  |  | ||||||
|  | 		ks := &KubernetesService{ | ||||||
|  | 			ServiceName: name, | ||||||
|  | 			ServiceNodes: []*KubernetesNode{ | ||||||
|  | 				&KubernetesNode{ | ||||||
|  | 					NodeAddress: serviceIP.String(), | ||||||
|  | 					NodePort:    service.Spec.Port, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		k.Registry.mtx.Lock() | ||||||
|  | 		k.Registry.services[name] = ks | ||||||
|  | 		k.Registry.mtx.Unlock() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	k.Registry.mtx.Lock() | ||||||
|  | 	defer k.Registry.mtx.Unlock() | ||||||
|  | 	for name, _ := range k.Registry.services { | ||||||
|  | 		if !activeServices.Has(name) { | ||||||
|  | 			delete(k.Registry.services, name) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewKubernetesWatcher(kr *KubernetesRegistry) *KubernetesWatcher { | ||||||
|  | 	serviceConfig := config.NewServiceConfig() | ||||||
|  | 	endpointsConfig := config.NewEndpointsConfig() | ||||||
|  |  | ||||||
|  | 	config.NewSourceAPI( | ||||||
|  | 		kr.Client.Services(api.NamespaceAll), | ||||||
|  | 		kr.Client.Endpoints(api.NamespaceAll), | ||||||
|  | 		time.Second*10, | ||||||
|  | 		serviceConfig.Channel("api"), | ||||||
|  | 		endpointsConfig.Channel("api"), | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	ks := &KubernetesWatcher{ | ||||||
|  | 		Registry: kr, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	serviceConfig.RegisterHandler(ks) | ||||||
|  | 	return ks | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user