diff --git a/cmd/cmd.go b/cmd/cmd.go index 874d2813..51fc4cef 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -24,6 +24,7 @@ import ( // registries "github.com/myodc/go-micro/registry/consul" "github.com/myodc/go-micro/registry/etcd" + "github.com/myodc/go-micro/registry/kubernetes" "github.com/myodc/go-micro/registry/memory" // transport @@ -60,7 +61,7 @@ var ( Name: "broker", EnvVar: "MICRO_BROKER", Value: "http", - Usage: "Broker for pub/sub. http, nats, etc", + Usage: "Broker for pub/sub. http, nats, rabbitmq", }, cli.StringFlag{ Name: "broker_address", @@ -71,7 +72,7 @@ var ( Name: "registry", EnvVar: "MICRO_REGISTRY", Value: "consul", - Usage: "Registry for discovery. memory, consul, etcd", + Usage: "Registry for discovery. memory, consul, etcd, kubernetes", }, cli.StringFlag{ Name: "registry_address", @@ -82,7 +83,7 @@ var ( Name: "transport", EnvVar: "MICRO_TRANSPORT", Value: "http", - Usage: "Transport mechanism used; http, rabbitmq, etc", + Usage: "Transport mechanism used; http, rabbitmq, nats", }, cli.StringFlag{ Name: "transport_address", @@ -128,9 +129,10 @@ var ( } Registries = map[string]func([]string, ...registry.Option) registry.Registry{ - "consul": consul.NewRegistry, - "etcd": etcd.NewRegistry, - "memory": memory.NewRegistry, + "consul": consul.NewRegistry, + "etcd": etcd.NewRegistry, + "kubernetes": kubernetes.NewRegistry, + "memory": memory.NewRegistry, } Transports = map[string]func([]string, ...transport.Option) transport.Transport{ diff --git a/registry/kubernetes/kubernetes.go b/registry/kubernetes/kubernetes.go new file mode 100644 index 00000000..f3fba084 --- /dev/null +++ b/registry/kubernetes/kubernetes.go @@ -0,0 +1,117 @@ +package kubernetes + +import ( + "fmt" + "os" + "sync" + + "github.com/myodc/go-micro/registry" + + k8s "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/labels" +) + +type kregistry struct { + client *k8s.Client + namespace string + + mtx sync.RWMutex + services map[string]*registry.Service +} + +func (c *kregistry) Deregister(s *registry.Service) error { + return nil +} + +func (c *kregistry) Register(s *registry.Service) error { + return nil +} + +func (c *kregistry) GetService(name string) (*registry.Service, error) { + c.mtx.RLock() + svc, ok := c.services[name] + c.mtx.RUnlock() + + if ok { + return svc, nil + } + + selector := labels.SelectorFromSet(labels.Set{"name": name}) + + services, err := c.client.Services(c.namespace).List(selector) + if err != nil { + return nil, err + } + + if len(services.Items) == 0 { + return nil, fmt.Errorf("Service not found") + } + + ks := ®istry.Service{ + Name: name, + } + + for _, item := range services.Items { + ks.Nodes = append(ks.Nodes, ®istry.Node{ + Address: item.Spec.ClusterIP, + Port: item.Spec.Ports[0].Port, + }) + } + + return ks, nil +} + +func (c *kregistry) ListServices() ([]*registry.Service, error) { + c.mtx.RLock() + serviceMap := c.services + c.mtx.RUnlock() + + var services []*registry.Service + + if len(serviceMap) > 0 { + for _, service := range serviceMap { + services = append(services, service) + } + return services, nil + } + + rsp, err := c.client.Services(c.namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + + for _, svc := range rsp.Items { + if len(svc.ObjectMeta.Labels["name"]) == 0 { + continue + } + + services = append(services, ®istry.Service{ + Name: svc.ObjectMeta.Labels["name"], + }) + } + + return services, nil +} + +func (c *kregistry) Watch() (registry.Watcher, error) { + return newWatcher(c) +} + +func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { + host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") + if len(addrs) > 0 { + host = addrs[0] + } + + client, _ := k8s.New(&k8s.Config{ + Host: host, + }) + + kr := &kregistry{ + client: client, + namespace: "default", + services: make(map[string]*registry.Service), + } + + return kr +} diff --git a/registry/kubernetes/watcher.go b/registry/kubernetes/watcher.go new file mode 100644 index 00000000..cbb4a158 --- /dev/null +++ b/registry/kubernetes/watcher.go @@ -0,0 +1,91 @@ +package kubernetes + +import ( + "net" + + "github.com/myodc/go-micro/registry" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/watch" +) + +type watcher struct { + registry *kregistry + watcher watch.Interface +} + +func (k *watcher) update(event watch.Event) { + if event.Object == nil { + return + } + + var service *api.Service + switch obj := event.Object.(type) { + case *api.Service: + service = obj + default: + return + } + + name, exists := service.ObjectMeta.Labels["name"] + if !exists { + return + } + + switch event.Type { + case watch.Added, watch.Modified: + case watch.Deleted: + k.registry.mtx.Lock() + delete(k.registry.services, name) + k.registry.mtx.Unlock() + return + default: + return + } + + serviceIP := net.ParseIP(service.Spec.ClusterIP) + + k.registry.mtx.Lock() + k.registry.services[name] = ®istry.Service{ + Name: name, + Nodes: []*registry.Node{ + ®istry.Node{ + Address: serviceIP.String(), + Port: service.Spec.Ports[0].Port, + }, + }, + } + k.registry.mtx.Unlock() +} + +func (k *watcher) Stop() { + k.watcher.Stop() +} + +func newWatcher(kr *kregistry) (registry.Watcher, error) { + svi := kr.client.Services(api.NamespaceAll) + + services, err := svi.List(labels.Everything()) + if err != nil { + return nil, err + } + + watch, err := svi.Watch(labels.Everything(), fields.Everything(), services.ResourceVersion) + if err != nil { + return nil, err + } + + w := &watcher{ + registry: kr, + watcher: watch, + } + + go func() { + for event := range watch.ResultChan() { + w.update(event) + } + }() + + return w, nil +}