Merge pull request #988 from micro/k8s-update
Change the k8s runtime notifier update to get the deployment and upda…
This commit is contained in:
		| @@ -2,9 +2,7 @@ | ||||
| package client | ||||
|  | ||||
| import ( | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-micro/util/log" | ||||
| ) | ||||
| @@ -94,16 +92,6 @@ func NewDeployment(name, version string) *Deployment { | ||||
| 		Annotations: map[string]string{}, | ||||
| 	} | ||||
|  | ||||
| 	// TODO: we need to figure out this version stuff | ||||
| 	// might have to add Build to runtime.Service | ||||
| 	buildTime, err := strconv.ParseInt(version, 10, 64) | ||||
| 	if err == nil { | ||||
| 		buildUnixTimeUTC := time.Unix(buildTime, 0) | ||||
| 		Metadata.Annotations["build"] = buildUnixTimeUTC.Format(time.RFC3339) | ||||
| 	} else { | ||||
| 		log.Tracef("could not parse build: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// enable go modules by default | ||||
| 	env := EnvVar{ | ||||
| 		Name:  "GO111MODULE", | ||||
|   | ||||
| @@ -4,7 +4,6 @@ package kubernetes | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| @@ -42,72 +41,6 @@ type kubernetes struct { | ||||
| 	client client.Kubernetes | ||||
| } | ||||
|  | ||||
| // NewRuntime creates new kubernetes runtime | ||||
| func NewRuntime(opts ...runtime.Option) runtime.Runtime { | ||||
| 	// get default options | ||||
| 	options := runtime.Options{} | ||||
|  | ||||
| 	// apply requested options | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// kubernetes client | ||||
| 	client := client.NewClientInCluster() | ||||
|  | ||||
| 	return &kubernetes{ | ||||
| 		options: options, | ||||
| 		closed:  make(chan bool), | ||||
| 		queue:   make(chan *task, 128), | ||||
| 		client:  client, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Init initializes runtime options | ||||
| func (k *kubernetes) Init(opts ...runtime.Option) error { | ||||
| 	k.Lock() | ||||
| 	defer k.Unlock() | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&k.options) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Creates a service | ||||
| func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error { | ||||
| 	k.Lock() | ||||
| 	defer k.Unlock() | ||||
|  | ||||
| 	var options runtime.CreateOptions | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// quickly prevalidate the name and version | ||||
| 	name := s.Name | ||||
| 	if len(s.Version) > 0 { | ||||
| 		name = name + "-" + s.Version | ||||
| 	} | ||||
|  | ||||
| 	// format as we'll format in the deployment | ||||
| 	name = client.Format(name) | ||||
|  | ||||
| 	// create new kubernetes micro service | ||||
| 	service := newService(s, options) | ||||
|  | ||||
| 	log.Debugf("Runtime queueing service %s version %s for start action", service.Name, service.Version) | ||||
|  | ||||
| 	// push into start queue | ||||
| 	k.queue <- &task{ | ||||
| 		action:  start, | ||||
| 		service: service, | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // getService queries kubernetes for micro service | ||||
| // NOTE: this function is not thread-safe | ||||
| func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, error) { | ||||
| @@ -117,6 +50,8 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e | ||||
| 		Kind:  "service", | ||||
| 		Value: serviceList, | ||||
| 	} | ||||
|  | ||||
| 	// get the service from k8s | ||||
| 	if err := k.client.Get(r, labels); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -127,6 +62,8 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e | ||||
| 		Kind:  "deployment", | ||||
| 		Value: depList, | ||||
| 	} | ||||
|  | ||||
| 	// get the deployment from k8s | ||||
| 	if err := k.client.Get(d, labels); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -217,6 +154,139 @@ func (k *kubernetes) getService(labels map[string]string) ([]*runtime.Service, e | ||||
| 	return services, nil | ||||
| } | ||||
|  | ||||
| // run runs the runtime management loop | ||||
| func (k *kubernetes) run(events <-chan runtime.Event) { | ||||
| 	t := time.NewTicker(time.Second * 10) | ||||
| 	defer t.Stop() | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-t.C: | ||||
| 			// TODO: figure out what to do here | ||||
| 			// - do we even need the ticker for k8s services? | ||||
| 		case task := <-k.queue: | ||||
| 			// The task queue is used to take actions e.g (CRUD - R) | ||||
| 			switch task.action { | ||||
| 			case start: | ||||
| 				log.Debugf("Runtime starting new service: %s", task.service.Name) | ||||
| 				if err := task.service.Start(k.client); err != nil { | ||||
| 					log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err) | ||||
| 					continue | ||||
| 				} | ||||
| 			case stop: | ||||
| 				log.Debugf("Runtime stopping service: %s", task.service.Name) | ||||
| 				if err := task.service.Stop(k.client); err != nil { | ||||
| 					log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err) | ||||
| 					continue | ||||
| 				} | ||||
| 			case update: | ||||
| 				log.Debugf("Runtime updating service: %s", task.service.Name) | ||||
| 				if err := task.service.Update(k.client); err != nil { | ||||
| 					log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err) | ||||
| 					continue | ||||
| 				} | ||||
| 			default: | ||||
| 				log.Debugf("Runtime received unknown action for service: %s", task.service.Name) | ||||
| 			} | ||||
| 		case event := <-events: | ||||
| 			// NOTE: we only handle Update events for now | ||||
| 			log.Debugf("Runtime received notification event: %v", event) | ||||
| 			switch event.Type { | ||||
| 			case runtime.Update: | ||||
| 				// only process if there's an actual service | ||||
| 				// we do not update all the things individually | ||||
| 				if len(event.Service) == 0 { | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				// set the default labels | ||||
| 				labels := map[string]string{ | ||||
| 					"micro": "service", | ||||
| 					"name":  event.Service, | ||||
| 				} | ||||
|  | ||||
| 				if len(event.Version) > 0 { | ||||
| 					labels["version"] = event.Version | ||||
| 				} | ||||
|  | ||||
| 				// get the deployment status | ||||
| 				deployed := new(client.DeploymentList) | ||||
|  | ||||
| 				// get the existing service rather than creating a new one | ||||
| 				err := k.client.Get(&client.Resource{ | ||||
| 					Kind:  "deployment", | ||||
| 					Value: deployed, | ||||
| 				}, labels) | ||||
|  | ||||
| 				if err != nil { | ||||
| 					log.Debugf("Runtime update failed to get service %s: %v", event.Service, err) | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				// technically we should not receive multiple versions but hey ho | ||||
| 				for _, service := range deployed.Items { | ||||
| 					// update build time annotation | ||||
| 					service.Spec.Template.Metadata.Annotations["build"] = event.Timestamp.Format(time.RFC3339) | ||||
|  | ||||
| 					log.Debugf("Runtime updating service: %s", event.Service) | ||||
| 					if err := k.client.Update(deploymentResource(&service)); err != nil { | ||||
| 						log.Debugf("Runtime failed to update service %s: %v", event.Service, err) | ||||
| 						continue | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		case <-k.closed: | ||||
| 			log.Debugf("Runtime stopped") | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Init initializes runtime options | ||||
| func (k *kubernetes) Init(opts ...runtime.Option) error { | ||||
| 	k.Lock() | ||||
| 	defer k.Unlock() | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&k.options) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Creates a service | ||||
| func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error { | ||||
| 	k.Lock() | ||||
| 	defer k.Unlock() | ||||
|  | ||||
| 	var options runtime.CreateOptions | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// quickly prevalidate the name and version | ||||
| 	name := s.Name | ||||
| 	if len(s.Version) > 0 { | ||||
| 		name = name + "-" + s.Version | ||||
| 	} | ||||
|  | ||||
| 	// format as we'll format in the deployment | ||||
| 	name = client.Format(name) | ||||
|  | ||||
| 	// create new kubernetes micro service | ||||
| 	service := newService(s, options) | ||||
|  | ||||
| 	log.Debugf("Runtime queueing service %s version %s for start action", service.Name, service.Version) | ||||
|  | ||||
| 	// push into start queue | ||||
| 	k.queue <- &task{ | ||||
| 		action:  start, | ||||
| 		service: service, | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Read returns all instances of given service | ||||
| func (k *kubernetes) Read(name string, opts ...runtime.ReadOption) ([]*runtime.Service, error) { | ||||
| 	k.Lock() | ||||
| @@ -267,18 +337,11 @@ func (k *kubernetes) List() ([]*runtime.Service, error) { | ||||
|  | ||||
| // Update the service in place | ||||
| func (k *kubernetes) Update(s *runtime.Service) error { | ||||
| 	// parse version into human readable timestamp | ||||
| 	updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	unixTimeUTC := time.Unix(updateTimeStamp, 0) | ||||
|  | ||||
| 	// create new kubernetes micro service | ||||
| 	service := newService(s, runtime.CreateOptions{}) | ||||
|  | ||||
| 	// update build time annotation | ||||
| 	service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339) | ||||
| 	service.kdeploy.Spec.Template.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) | ||||
|  | ||||
| 	log.Debugf("Runtime queueing service %s for update action", service.Name) | ||||
|  | ||||
| @@ -291,7 +354,7 @@ func (k *kubernetes) Update(s *runtime.Service) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Remove a service | ||||
| // Delete removes a service | ||||
| func (k *kubernetes) Delete(s *runtime.Service) error { | ||||
| 	k.Lock() | ||||
| 	defer k.Unlock() | ||||
| @@ -310,76 +373,7 @@ func (k *kubernetes) Delete(s *runtime.Service) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // run runs the runtime management loop | ||||
| func (k *kubernetes) run(events <-chan runtime.Event) { | ||||
| 	t := time.NewTicker(time.Second * 10) | ||||
| 	defer t.Stop() | ||||
|  | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-t.C: | ||||
| 			// TODO: figure out what to do here | ||||
| 			// - do we even need the ticker for k8s services? | ||||
| 		case task := <-k.queue: | ||||
| 			switch task.action { | ||||
| 			case start: | ||||
| 				log.Debugf("Runtime starting new service: %s", task.service.Name) | ||||
| 				if err := task.service.Start(k.client); err != nil { | ||||
| 					log.Debugf("Runtime failed to start service %s: %v", task.service.Name, err) | ||||
| 					continue | ||||
| 				} | ||||
| 			case stop: | ||||
| 				log.Debugf("Runtime stopping service: %s", task.service.Name) | ||||
| 				if err := task.service.Stop(k.client); err != nil { | ||||
| 					log.Debugf("Runtime failed to stop service %s: %v", task.service.Name, err) | ||||
| 					continue | ||||
| 				} | ||||
| 			case update: | ||||
| 				log.Debugf("Runtime updating service: %s", task.service.Name) | ||||
| 				if err := task.service.Update(k.client); err != nil { | ||||
| 					log.Debugf("Runtime failed to update service %s: %v", task.service.Name, err) | ||||
| 					continue | ||||
| 				} | ||||
| 			default: | ||||
| 				log.Debugf("Runtime received unknown action for service: %s", task.service.Name) | ||||
| 			} | ||||
| 		case event := <-events: | ||||
| 			// NOTE: we only handle Update events for now | ||||
| 			log.Debugf("Runtime received notification event: %v", event) | ||||
| 			switch event.Type { | ||||
| 			case runtime.Update: | ||||
| 				// parse returned response to timestamp | ||||
| 				updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64) | ||||
| 				if err != nil { | ||||
| 					log.Debugf("Runtime error parsing update build time: %v", err) | ||||
| 					continue | ||||
| 				} | ||||
| 				unixTimeUTC := time.Unix(updateTimeStamp, 0) | ||||
| 				if len(event.Service) > 0 { | ||||
| 					s := &runtime.Service{ | ||||
| 						Name:    event.Service, | ||||
| 						Version: event.Version, | ||||
| 					} | ||||
| 					// create new kubernetes micro service | ||||
| 					service := newService(s, runtime.CreateOptions{}) | ||||
| 					// update build time annotation | ||||
| 					service.kdeploy.Spec.Template.Metadata.Annotations["build"] = unixTimeUTC.Format(time.RFC3339) | ||||
|  | ||||
| 					log.Debugf("Runtime updating service: %s", service.Name) | ||||
| 					if err := service.Update(k.client); err != nil { | ||||
| 						log.Debugf("Runtime failed to update service %s: %v", service.Name, err) | ||||
| 						continue | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		case <-k.closed: | ||||
| 			log.Debugf("Runtime stopped") | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // starts the runtime | ||||
| // Start starts the runtime | ||||
| func (k *kubernetes) Start() error { | ||||
| 	k.Lock() | ||||
| 	defer k.Unlock() | ||||
| @@ -408,7 +402,7 @@ func (k *kubernetes) Start() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Shutdown the runtime | ||||
| // Stop shuts down the runtime | ||||
| func (k *kubernetes) Stop() error { | ||||
| 	k.Lock() | ||||
| 	defer k.Unlock() | ||||
| @@ -437,3 +431,24 @@ func (k *kubernetes) Stop() error { | ||||
| func (k *kubernetes) String() string { | ||||
| 	return "kubernetes" | ||||
| } | ||||
|  | ||||
| // NewRuntime creates new kubernetes runtime | ||||
| func NewRuntime(opts ...runtime.Option) runtime.Runtime { | ||||
| 	// get default options | ||||
| 	options := runtime.Options{} | ||||
|  | ||||
| 	// apply requested options | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// kubernetes client | ||||
| 	client := client.NewClientInCluster() | ||||
|  | ||||
| 	return &kubernetes{ | ||||
| 		options: options, | ||||
| 		closed:  make(chan bool), | ||||
| 		queue:   make(chan *task, 128), | ||||
| 		client:  client, | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -2,6 +2,7 @@ package kubernetes | ||||
|  | ||||
| import ( | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/micro/go-micro/runtime" | ||||
| 	"github.com/micro/go-micro/runtime/kubernetes/client" | ||||
| @@ -34,6 +35,9 @@ func newService(s *runtime.Service, c runtime.CreateOptions) *service { | ||||
| 	kdeploy.Metadata.Annotations["owner"] = "micro" | ||||
| 	kdeploy.Metadata.Annotations["group"] = "micro" | ||||
|  | ||||
| 	// set a build timestamp to the current time | ||||
| 	kdeploy.Metadata.Annotations["build"] = time.Now().Format(time.RFC3339) | ||||
|  | ||||
| 	// define the environment values used by the container | ||||
| 	env := make([]client.EnvVar, 0, len(c.Env)) | ||||
| 	for _, evar := range c.Env { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user