From 3302fab644c62d84c84c185a33375376d56c1f64 Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 27 Oct 2015 19:08:45 +0000 Subject: [PATCH] Move kubernetes registry to myodc/go-plugins because god damnit stop breaking you.... --- cmd/cmd.go | 1 - registry/kubernetes/kubernetes.go | 118 ------------------------------ registry/kubernetes/watcher.go | 91 ----------------------- 3 files changed, 210 deletions(-) delete mode 100644 registry/kubernetes/kubernetes.go delete mode 100644 registry/kubernetes/watcher.go diff --git a/cmd/cmd.go b/cmd/cmd.go index 51fc4cef..5541fdd0 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -131,7 +131,6 @@ var ( Registries = map[string]func([]string, ...registry.Option) registry.Registry{ "consul": consul.NewRegistry, "etcd": etcd.NewRegistry, - "kubernetes": kubernetes.NewRegistry, "memory": memory.NewRegistry, } diff --git a/registry/kubernetes/kubernetes.go b/registry/kubernetes/kubernetes.go deleted file mode 100644 index 97d58b90..00000000 --- a/registry/kubernetes/kubernetes.go +++ /dev/null @@ -1,118 +0,0 @@ -package kubernetes - -import ( - "fmt" - "os" - "sync" - - "github.com/myodc/go-micro/registry" - - k8s "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" -) - -type kregistry struct { - client *k8s.Client - namespace string - - mtx sync.RWMutex - services map[string]*registry.Service -} - -func (c *kregistry) Deregister(s *registry.Service) error { - return nil -} - -func (c *kregistry) Register(s *registry.Service) error { - return nil -} - -func (c *kregistry) GetService(name string) (*registry.Service, error) { - c.mtx.RLock() - svc, ok := c.services[name] - c.mtx.RUnlock() - - if ok { - return svc, nil - } - - selector := labels.SelectorFromSet(labels.Set{"name": name}) - - services, err := c.client.Services(c.namespace).List(selector, fields.Everything()) - if err != nil { - return nil, err - } - - if len(services.Items) == 0 { - return nil, fmt.Errorf("Service not found") - } - - ks := ®istry.Service{ - Name: name, - } - - for _, item := range services.Items { - ks.Nodes = append(ks.Nodes, ®istry.Node{ - Address: item.Spec.ClusterIP, - Port: item.Spec.Ports[0].Port, - }) - } - - return ks, nil -} - -func (c *kregistry) ListServices() ([]*registry.Service, error) { - c.mtx.RLock() - serviceMap := c.services - c.mtx.RUnlock() - - var services []*registry.Service - - if len(serviceMap) > 0 { - for _, service := range serviceMap { - services = append(services, service) - } - return services, nil - } - - rsp, err := c.client.Services(c.namespace).List(labels.Everything(), fields.Everything()) - if err != nil { - return nil, err - } - - for _, svc := range rsp.Items { - if len(svc.ObjectMeta.Labels["name"]) == 0 { - continue - } - - services = append(services, ®istry.Service{ - Name: svc.ObjectMeta.Labels["name"], - }) - } - - return services, nil -} - -func (c *kregistry) Watch() (registry.Watcher, error) { - return newWatcher(c) -} - -func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { - host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") - if len(addrs) > 0 { - host = addrs[0] - } - - client, _ := k8s.New(&k8s.Config{ - Host: host, - }) - - kr := &kregistry{ - client: client, - namespace: "default", - services: make(map[string]*registry.Service), - } - - return kr -} diff --git a/registry/kubernetes/watcher.go b/registry/kubernetes/watcher.go deleted file mode 100644 index 01806e6b..00000000 --- a/registry/kubernetes/watcher.go +++ /dev/null @@ -1,91 +0,0 @@ -package kubernetes - -import ( - "net" - - "github.com/myodc/go-micro/registry" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/watch" -) - -type watcher struct { - registry *kregistry - watcher watch.Interface -} - -func (k *watcher) update(event watch.Event) { - if event.Object == nil { - return - } - - var service *api.Service - switch obj := event.Object.(type) { - case *api.Service: - service = obj - default: - return - } - - name, exists := service.ObjectMeta.Labels["name"] - if !exists { - return - } - - switch event.Type { - case watch.Added, watch.Modified: - case watch.Deleted: - k.registry.mtx.Lock() - delete(k.registry.services, name) - k.registry.mtx.Unlock() - return - default: - return - } - - serviceIP := net.ParseIP(service.Spec.ClusterIP) - - k.registry.mtx.Lock() - k.registry.services[name] = ®istry.Service{ - Name: name, - Nodes: []*registry.Node{ - ®istry.Node{ - Address: serviceIP.String(), - Port: service.Spec.Ports[0].Port, - }, - }, - } - k.registry.mtx.Unlock() -} - -func (k *watcher) Stop() { - k.watcher.Stop() -} - -func newWatcher(kr *kregistry) (registry.Watcher, error) { - svi := kr.client.Services(api.NamespaceAll) - - services, err := svi.List(labels.Everything(), fields.Everything()) - if err != nil { - return nil, err - } - - watch, err := svi.Watch(labels.Everything(), fields.Everything(), services.ResourceVersion) - if err != nil { - return nil, err - } - - w := &watcher{ - registry: kr, - watcher: watch, - } - - go func() { - for event := range watch.ResultChan() { - w.update(event) - } - }() - - return w, nil -}