add kubernetes registry back since its being used...
This commit is contained in:
		| @@ -24,6 +24,7 @@ import ( | |||||||
| 	// registries | 	// registries | ||||||
| 	"github.com/myodc/go-micro/registry/consul" | 	"github.com/myodc/go-micro/registry/consul" | ||||||
| 	"github.com/myodc/go-micro/registry/etcd" | 	"github.com/myodc/go-micro/registry/etcd" | ||||||
|  | 	"github.com/myodc/go-micro/registry/kubernetes" | ||||||
| 	"github.com/myodc/go-micro/registry/memory" | 	"github.com/myodc/go-micro/registry/memory" | ||||||
|  |  | ||||||
| 	// transport | 	// transport | ||||||
| @@ -60,7 +61,7 @@ var ( | |||||||
| 			Name:   "broker", | 			Name:   "broker", | ||||||
| 			EnvVar: "MICRO_BROKER", | 			EnvVar: "MICRO_BROKER", | ||||||
| 			Value:  "http", | 			Value:  "http", | ||||||
| 			Usage:  "Broker for pub/sub. http, nats, etc", | 			Usage:  "Broker for pub/sub. http, nats, rabbitmq", | ||||||
| 		}, | 		}, | ||||||
| 		cli.StringFlag{ | 		cli.StringFlag{ | ||||||
| 			Name:   "broker_address", | 			Name:   "broker_address", | ||||||
| @@ -71,7 +72,7 @@ var ( | |||||||
| 			Name:   "registry", | 			Name:   "registry", | ||||||
| 			EnvVar: "MICRO_REGISTRY", | 			EnvVar: "MICRO_REGISTRY", | ||||||
| 			Value:  "consul", | 			Value:  "consul", | ||||||
| 			Usage:  "Registry for discovery. memory, consul, etcd", | 			Usage:  "Registry for discovery. memory, consul, etcd, kubernetes", | ||||||
| 		}, | 		}, | ||||||
| 		cli.StringFlag{ | 		cli.StringFlag{ | ||||||
| 			Name:   "registry_address", | 			Name:   "registry_address", | ||||||
| @@ -82,7 +83,7 @@ var ( | |||||||
| 			Name:   "transport", | 			Name:   "transport", | ||||||
| 			EnvVar: "MICRO_TRANSPORT", | 			EnvVar: "MICRO_TRANSPORT", | ||||||
| 			Value:  "http", | 			Value:  "http", | ||||||
| 			Usage:  "Transport mechanism used; http, rabbitmq, etc", | 			Usage:  "Transport mechanism used; http, rabbitmq, nats", | ||||||
| 		}, | 		}, | ||||||
| 		cli.StringFlag{ | 		cli.StringFlag{ | ||||||
| 			Name:   "transport_address", | 			Name:   "transport_address", | ||||||
| @@ -130,6 +131,7 @@ var ( | |||||||
| 	Registries = map[string]func([]string, ...registry.Option) registry.Registry{ | 	Registries = map[string]func([]string, ...registry.Option) registry.Registry{ | ||||||
| 		"consul":     consul.NewRegistry, | 		"consul":     consul.NewRegistry, | ||||||
| 		"etcd":       etcd.NewRegistry, | 		"etcd":       etcd.NewRegistry, | ||||||
|  | 		"kubernetes": kubernetes.NewRegistry, | ||||||
| 		"memory":     memory.NewRegistry, | 		"memory":     memory.NewRegistry, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										117
									
								
								registry/kubernetes/kubernetes.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										117
									
								
								registry/kubernetes/kubernetes.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,117 @@ | |||||||
|  | package kubernetes | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"os" | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
|  | 	"github.com/myodc/go-micro/registry" | ||||||
|  |  | ||||||
|  | 	k8s "k8s.io/kubernetes/pkg/client/unversioned" | ||||||
|  | 	"k8s.io/kubernetes/pkg/labels" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type kregistry struct { | ||||||
|  | 	client    *k8s.Client | ||||||
|  | 	namespace string | ||||||
|  |  | ||||||
|  | 	mtx      sync.RWMutex | ||||||
|  | 	services map[string]*registry.Service | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *kregistry) Deregister(s *registry.Service) error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *kregistry) Register(s *registry.Service) error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *kregistry) GetService(name string) (*registry.Service, error) { | ||||||
|  | 	c.mtx.RLock() | ||||||
|  | 	svc, ok := c.services[name] | ||||||
|  | 	c.mtx.RUnlock() | ||||||
|  |  | ||||||
|  | 	if ok { | ||||||
|  | 		return svc, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	selector := labels.SelectorFromSet(labels.Set{"name": name}) | ||||||
|  |  | ||||||
|  | 	services, err := c.client.Services(c.namespace).List(selector) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(services.Items) == 0 { | ||||||
|  | 		return nil, fmt.Errorf("Service not found") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ks := ®istry.Service{ | ||||||
|  | 		Name: name, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, item := range services.Items { | ||||||
|  | 		ks.Nodes = append(ks.Nodes, ®istry.Node{ | ||||||
|  | 			Address: item.Spec.ClusterIP, | ||||||
|  | 			Port:    item.Spec.Ports[0].Port, | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return ks, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *kregistry) ListServices() ([]*registry.Service, error) { | ||||||
|  | 	c.mtx.RLock() | ||||||
|  | 	serviceMap := c.services | ||||||
|  | 	c.mtx.RUnlock() | ||||||
|  |  | ||||||
|  | 	var services []*registry.Service | ||||||
|  |  | ||||||
|  | 	if len(serviceMap) > 0 { | ||||||
|  | 		for _, service := range serviceMap { | ||||||
|  | 			services = append(services, service) | ||||||
|  | 		} | ||||||
|  | 		return services, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	rsp, err := c.client.Services(c.namespace).List(labels.Everything()) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, svc := range rsp.Items { | ||||||
|  | 		if len(svc.ObjectMeta.Labels["name"]) == 0 { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		services = append(services, ®istry.Service{ | ||||||
|  | 			Name: svc.ObjectMeta.Labels["name"], | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return services, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *kregistry) Watch() (registry.Watcher, error) { | ||||||
|  | 	return newWatcher(c) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewRegistry(addrs []string, opts ...registry.Option) registry.Registry { | ||||||
|  | 	host := "http://" + os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT") | ||||||
|  | 	if len(addrs) > 0 { | ||||||
|  | 		host = addrs[0] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	client, _ := k8s.New(&k8s.Config{ | ||||||
|  | 		Host: host, | ||||||
|  | 	}) | ||||||
|  |  | ||||||
|  | 	kr := &kregistry{ | ||||||
|  | 		client:    client, | ||||||
|  | 		namespace: "default", | ||||||
|  | 		services:  make(map[string]*registry.Service), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return kr | ||||||
|  | } | ||||||
							
								
								
									
										91
									
								
								registry/kubernetes/watcher.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										91
									
								
								registry/kubernetes/watcher.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,91 @@ | |||||||
|  | package kubernetes | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"net" | ||||||
|  |  | ||||||
|  | 	"github.com/myodc/go-micro/registry" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	"k8s.io/kubernetes/pkg/fields" | ||||||
|  | 	"k8s.io/kubernetes/pkg/labels" | ||||||
|  | 	"k8s.io/kubernetes/pkg/watch" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type watcher struct { | ||||||
|  | 	registry *kregistry | ||||||
|  | 	watcher  watch.Interface | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (k *watcher) update(event watch.Event) { | ||||||
|  | 	if event.Object == nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var service *api.Service | ||||||
|  | 	switch obj := event.Object.(type) { | ||||||
|  | 	case *api.Service: | ||||||
|  | 		service = obj | ||||||
|  | 	default: | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	name, exists := service.ObjectMeta.Labels["name"] | ||||||
|  | 	if !exists { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	switch event.Type { | ||||||
|  | 	case watch.Added, watch.Modified: | ||||||
|  | 	case watch.Deleted: | ||||||
|  | 		k.registry.mtx.Lock() | ||||||
|  | 		delete(k.registry.services, name) | ||||||
|  | 		k.registry.mtx.Unlock() | ||||||
|  | 		return | ||||||
|  | 	default: | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	serviceIP := net.ParseIP(service.Spec.ClusterIP) | ||||||
|  |  | ||||||
|  | 	k.registry.mtx.Lock() | ||||||
|  | 	k.registry.services[name] = ®istry.Service{ | ||||||
|  | 		Name: name, | ||||||
|  | 		Nodes: []*registry.Node{ | ||||||
|  | 			®istry.Node{ | ||||||
|  | 				Address: serviceIP.String(), | ||||||
|  | 				Port:    service.Spec.Ports[0].Port, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	k.registry.mtx.Unlock() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (k *watcher) Stop() { | ||||||
|  | 	k.watcher.Stop() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newWatcher(kr *kregistry) (registry.Watcher, error) { | ||||||
|  | 	svi := kr.client.Services(api.NamespaceAll) | ||||||
|  |  | ||||||
|  | 	services, err := svi.List(labels.Everything()) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	watch, err := svi.Watch(labels.Everything(), fields.Everything(), services.ResourceVersion) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	w := &watcher{ | ||||||
|  | 		registry: kr, | ||||||
|  | 		watcher:  watch, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	go func() { | ||||||
|  | 		for event := range watch.ResultChan() { | ||||||
|  | 			w.update(event) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	return w, nil | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user