From dad05be95ee06a2c79c71fcaa1550eeeb1b12c1f Mon Sep 17 00:00:00 2001 From: ben-toogood Date: Fri, 9 Oct 2020 13:28:15 +0100 Subject: [PATCH] runtime/kubernetes: rewrite to improve support of multiple versions of a single service (#2035) * wip: refactor kubernetes package * runtime/kubernetes: fix invalid labels * runtime/kubernetes: handle delete service not found error * Misc Fixes * runtime: add ServiceAccount option * router/static: return noop table * add kubernetes router * runtime: add port option * store/file: set directory * store/file: pass options to blob store * Revert changes to static router * Fix merge error * runtime/kubernetes: Debug => Error logs * runtime/kubernetes: fix double if --- runtime/kubernetes/kubernetes.go | 550 ++++++---------------------- runtime/kubernetes/namespace.go | 112 ++++++ runtime/kubernetes/service.go | 241 ------------ runtime/kubernetes/util.go | 206 +++++++++++ runtime/options.go | 22 +- util/kubernetes/client/client.go | 249 +++++++------ util/kubernetes/client/options.go | 14 +- util/kubernetes/client/templates.go | 2 +- util/kubernetes/client/util_test.go | 12 +- 9 files changed, 614 insertions(+), 794 deletions(-) create mode 100644 runtime/kubernetes/namespace.go delete mode 100644 runtime/kubernetes/service.go create mode 100644 runtime/kubernetes/util.go diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index afadd244..dc5f13cb 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -2,9 +2,7 @@ package kubernetes import ( - "encoding/base64" "fmt" - "strings" "sync" "time" @@ -19,234 +17,20 @@ import ( type action int type kubernetes struct { - sync.RWMutex + sync.Mutex // options configure runtime options runtime.Options - // indicates if we're running - running bool // client is kubernetes client client client.Client // namespaces which exist namespaces []client.Namespace } -// namespaceExists returns a boolean indicating if a namespace exists -func (k *kubernetes) namespaceExists(name string) (bool, error) { - // populate the cache - if k.namespaces == nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Populating namespace cache") - } - - namespaceList := new(client.NamespaceList) - resource := &client.Resource{Kind: "namespace", Value: namespaceList} - if err := k.client.List(resource); err != nil { - return false, err - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Popualted namespace cache successfully with %v items", len(namespaceList.Items)) - } - k.namespaces = namespaceList.Items - } - - // check if the namespace exists in the cache - for _, n := range k.namespaces { - if n.Metadata.Name == name { - return true, nil - } - } - - return false, nil -} - -// createNamespace creates a new k8s namespace -func (k *kubernetes) createNamespace(namespace string) error { - ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}} - err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns}) - - // ignore err already exists - if err != nil && strings.Contains(err.Error(), "already exists") { - logger.Debugf("Ignoring ErrAlreadyExists for namespace %v: %v", namespace, err) - err = nil - } - - // add to cache - if err == nil && k.namespaces != nil { - k.namespaces = append(k.namespaces, ns) - } - - return err -} - -// getService queries kubernetes for micro service -// NOTE: this function is not thread-safe -func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOption) ([]*service, error) { - // get the service status - serviceList := new(client.ServiceList) - r := &client.Resource{ - Kind: "service", - Value: serviceList, - } - - opts = append(opts, client.GetLabels(labels)) - - // get the service from k8s - if err := k.client.Get(r, opts...); err != nil { - return nil, err - } - - // get the deployment status - depList := new(client.DeploymentList) - d := &client.Resource{ - Kind: "deployment", - Value: depList, - } - if err := k.client.Get(d, opts...); err != nil { - return nil, err - } - - // get the pods from k8s - podList := new(client.PodList) - p := &client.Resource{ - Kind: "pod", - Value: podList, - } - if err := k.client.Get(p, opts...); err != nil { - return nil, err - } - - // service map - svcMap := make(map[string]*service) - - // collect info from kubernetes service - for _, kservice := range serviceList.Items { - // name of the service - name := kservice.Metadata.Labels["name"] - // version of the service - version := kservice.Metadata.Labels["version"] - - srv := &service{ - Service: &runtime.Service{ - Name: name, - Version: version, - Metadata: make(map[string]string), - }, - kservice: &kservice, - } - - // set the address - address := kservice.Spec.ClusterIP - port := kservice.Spec.Ports[0] - srv.Service.Metadata["address"] = fmt.Sprintf("%s:%d", address, port.Port) - // set the type of service - srv.Service.Metadata["type"] = kservice.Metadata.Labels["micro"] - - // copy annotations metadata into service metadata - for k, v := range kservice.Metadata.Annotations { - srv.Service.Metadata[k] = v - } - - // save as service - svcMap[name+version] = srv - } - - // collect additional info from kubernetes deployment - for _, kdep := range depList.Items { - // name of the service - name := kdep.Metadata.Labels["name"] - // versio of the service - version := kdep.Metadata.Labels["version"] - - // access existing service map based on name + version - if svc, ok := svcMap[name+version]; ok { - // we're expecting our own service name in metadata - if _, ok := kdep.Metadata.Annotations["name"]; !ok { - continue - } - - // set the service name, version and source - // based on existing annotations we stored - svc.Service.Name = kdep.Metadata.Annotations["name"] - svc.Service.Version = kdep.Metadata.Annotations["version"] - svc.Service.Source = kdep.Metadata.Annotations["source"] - - // delete from metadata - delete(kdep.Metadata.Annotations, "name") - delete(kdep.Metadata.Annotations, "version") - delete(kdep.Metadata.Annotations, "source") - - // copy all annotations metadata into service metadata - for k, v := range kdep.Metadata.Annotations { - svc.Service.Metadata[k] = v - } - - // parse out deployment status and inject into service metadata - if len(kdep.Status.Conditions) > 0 { - status := transformStatus(kdep.Status.Conditions[0].Type) - svc.Status(status, nil) - svc.Metadata["started"] = kdep.Status.Conditions[0].LastUpdateTime - } else { - svc.Status(runtime.Unknown, nil) - } - - // get the real status - for _, item := range podList.Items { - // check the name - if item.Metadata.Labels["name"] != name { - continue - } - // check the version - if item.Metadata.Labels["version"] != version { - continue - } - - status := transformStatus(item.Status.Phase) - - // skip if we can't get the container - if len(item.Status.Containers) == 0 { - continue - } - - // now try get a deeper status - state := item.Status.Containers[0].State - // set start time - if state.Running != nil { - svc.Metadata["started"] = state.Running.Started - } - - // set status from waiting - if v := state.Waiting; v != nil { - status = runtime.Starting - } - - svc.Status(status, nil) - } - - // save deployment - svc.kdeploy = &kdep - } - } - - // collect all the services and return - services := make([]*service, 0, len(serviceList.Items)) - - for _, service := range svcMap { - services = append(services, service) - } - - return services, nil -} - // Init initializes runtime options func (k *kubernetes) Init(opts ...runtime.Option) error { - k.Lock() - defer k.Unlock() - for _, o := range opts { o(&k.options) } - return nil } @@ -313,64 +97,57 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er k.Lock() defer k.Unlock() - options := runtime.CreateOptions{ + // parse the options + options := &runtime.CreateOptions{ Type: k.options.Type, + Image: k.options.Image, Namespace: client.DefaultNamespace, } for _, o := range opts { - o(&options) + o(options) } - // default type if it doesn't exist - if len(options.Type) == 0 { - options.Type = k.options.Type - } - - // default the source if it doesn't exist + // default the service's source and version if len(s.Source) == 0 { s.Source = k.options.Source } + if len(s.Version) == 0 { + s.Version = "latest" + } // ensure the namespace exists - namespace := client.SerializeResourceName(options.Namespace) - // only do this if the namespace is not default - if namespace != "default" { - if exist, err := k.namespaceExists(namespace); err == nil && !exist { - if err := k.createNamespace(namespace); err != nil { - if logger.V(logger.WarnLevel, logger.DefaultLogger) { - logger.Warnf("Error creating namespace %v: %v", namespace, err) - } - return err - } - } else if err != nil { - if logger.V(logger.WarnLevel, logger.DefaultLogger) { - logger.Warnf("Error checking namespace %v exists: %v", namespace, err) - } - return err - } - } - // determine the image from the source and options - options.Image = k.getImage(s, options) - - // create a secret for the credentials if some where provided - if len(options.Secrets) > 0 { - if err := k.createCredentials(s, options); err != nil { - if logger.V(logger.WarnLevel, logger.DefaultLogger) { - logger.Warnf("Error generating auth credentials for service: %v", err) - } - return err - } - - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Generated auth credentials for service %v", s.Name) - } + if err := k.ensureNamepaceExists(options.Namespace); err != nil { + return nil } - // create new service - service := newService(s, options) + // create a secret for the deployment + if err := k.createCredentials(s, options); err != nil { + return err + } - // start the service - return service.Start(k.client, client.CreateNamespace(options.Namespace)) + // create the deployment + if err := k.client.Create(client.NewDeployment(s, options), client.CreateNamespace(options.Namespace)); err != nil { + if parseError(err).Reason == "AlreadyExists" { + return runtime.ErrAlreadyExists + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to create deployment: %v", err) + } + return err + } + + // create the service, one could already exist for another version so ignore ErrAlreadyExists + if err := k.client.Create(client.NewService(s, options), client.CreateNamespace(options.Namespace)); err != nil { + if parseError(err).Reason == "AlreadyExists" { + return nil + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to create service: %v", err) + } + return err + } + + return nil } // Read returns all instances of given service @@ -378,88 +155,94 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error k.Lock() defer k.Unlock() - // set the default labels - labels := map[string]string{} - + // parse the options options := runtime.ReadOptions{ Namespace: client.DefaultNamespace, } - for _, o := range opts { o(&options) } + // construct the query + labels := map[string]string{} if len(options.Service) > 0 { labels["name"] = client.Format(options.Service) } - - // add version to labels if a version has been supplied if len(options.Version) > 0 { labels["version"] = client.Format(options.Version) } - if len(options.Type) > 0 { - labels["micro"] = options.Type + labels["micro"] = client.Format(options.Type) } - srvs, err := k.getService(labels, client.GetNamespace(options.Namespace)) - if err != nil { - return nil, err - } - - var services []*runtime.Service - for _, service := range srvs { - services = append(services, service.Service) - } - - return services, nil + // lookup all the serivces which match this query, if one service has two different versions, + // they'll be returned as two seperate resullts + return k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels)) } // Update the service in place func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error { + k.Lock() + defer k.Unlock() + + // parse the options options := runtime.UpdateOptions{ Namespace: client.DefaultNamespace, } - for _, o := range opts { o(&options) } + // construct the query labels := map[string]string{} - if len(s.Name) > 0 { labels["name"] = client.Format(s.Name) } - if len(s.Version) > 0 { labels["version"] = client.Format(s.Version) } - // get the existing service - services, err := k.getService(labels, client.GetNamespace(options.Namespace)) - if err != nil { + // get the existing deployments + depList := new(client.DeploymentList) + d := &client.Resource{ + Kind: "deployment", + Value: depList, + } + depOpts := []client.GetOption{ + client.GetNamespace(options.Namespace), + client.GetLabels(labels), + } + if err := k.client.Get(d, depOpts...); err != nil { return err + } else if len(depList.Items) == 0 { + return runtime.ErrNotFound } - // update the relevant services - for _, service := range services { - // nil check - if service.kdeploy.Metadata == nil || service.kdeploy.Metadata.Annotations == nil { - md := new(client.Metadata) - md.Annotations = make(map[string]string) - service.kdeploy.Metadata = md + // update the deployments which match the query + for _, dep := range depList.Items { + // the service wan't created by the k8s runtime + if dep.Metadata == nil || dep.Metadata.Annotations == nil { + continue } // update metadata for k, v := range s.Metadata { - service.kdeploy.Metadata.Annotations[k] = v + dep.Metadata.Annotations[k] = v } // update build time annotation - service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix()) + dep.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix()) - // update the service - if err := service.Update(k.client, client.UpdateNamespace(options.Namespace)); err != nil { + // update the deployment + res := &client.Resource{ + Kind: "deployment", + Name: resourceName(s), + Value: &dep, + } + if err := k.client.Update(res, client.UpdateNamespace(options.Namespace)); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to update deployment: %v", err) + } return err } } @@ -469,6 +252,10 @@ func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) er // Delete removes a service func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error { + k.Lock() + defer k.Unlock() + + // parse the options options := runtime.DeleteOptions{ Namespace: client.DefaultNamespace, } @@ -476,22 +263,47 @@ func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) er o(&options) } - k.Lock() - defer k.Unlock() - - // create new kubernetes micro service - service := newService(s, runtime.CreateOptions{ + // delete the deployment + dep := client.NewDeployment(s, &runtime.CreateOptions{ Type: k.options.Type, Namespace: options.Namespace, }) + if err := k.client.Delete(dep, client.DeleteNamespace(options.Namespace)); err != nil { + if err == api.ErrNotFound { + return runtime.ErrNotFound + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to delete deployment: %v", err) + } + return err + } - // delete the service credentials - ns := client.DeleteNamespace(options.Namespace) - k.client.Delete(&client.Resource{Name: credentialsName(s), Kind: "secret"}, ns) + // delete the credentials + if err := k.deleteCredentials(s, &runtime.CreateOptions{Namespace: options.Namespace}); err != nil { + return err + } - if err := service.Stop(k.client, ns); err == api.ErrNotFound { - return runtime.ErrNotFound - } else if err != nil { + // if there are more deployments for this service, then don't delete it + labels := map[string]string{} + if len(s.Name) > 0 { + labels["name"] = client.Format(s.Name) + } + + // get the existing services. todo: refactor to just get the deployments + services, err := k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels)) + if err != nil || len(services) > 0 { + return err + } + + // delete the service + srv := client.NewService(s, &runtime.CreateOptions{ + Type: k.options.Type, + Namespace: options.Namespace, + }) + if err := k.client.Delete(srv, client.DeleteNamespace(options.Namespace)); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Runtime failed to delete service: %v", err) + } return err } @@ -500,30 +312,11 @@ func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) er // Start starts the runtime func (k *kubernetes) Start() error { - k.Lock() - defer k.Unlock() - - // already running - if k.running { - return nil - } - - // set running - k.running = true return nil } // Stop shuts down the runtime func (k *kubernetes) Stop() error { - k.Lock() - defer k.Unlock() - - if !k.running { - return nil - } - - // set not running - k.running = false return nil } @@ -553,106 +346,3 @@ func NewRuntime(opts ...runtime.Option) runtime.Runtime { client: client, } } - -func (k *kubernetes) getImage(s *runtime.Service, options runtime.CreateOptions) string { - // use the image when its specified - if len(options.Image) > 0 { - return options.Image - } - - if len(k.options.Image) > 0 { - return k.options.Image - } - - return "" -} -func (k *kubernetes) createCredentials(service *runtime.Service, options runtime.CreateOptions) error { - data := make(map[string]string, len(options.Secrets)) - for key, value := range options.Secrets { - data[key] = base64.StdEncoding.EncodeToString([]byte(value)) - } - - // construct the k8s secret object - secret := &client.Secret{ - Type: "Opaque", - Data: data, - Metadata: &client.Metadata{ - Name: credentialsName(service), - Namespace: options.Namespace, - }, - } - - // crete the secret in kubernetes - name := credentialsName(service) - return k.client.Create(&client.Resource{ - Kind: "secret", Name: name, Value: secret, - }, client.CreateNamespace(options.Namespace)) -} - -func credentialsName(service *runtime.Service) string { - name := fmt.Sprintf("%v-%v-credentials", service.Name, service.Version) - return client.SerializeResourceName(name) -} - -func (k *kubernetes) CreateNamespace(ns string) error { - err := k.client.Create(&client.Resource{ - Kind: "namespace", - Value: client.Namespace{ - Metadata: &client.Metadata{ - Name: ns, - }, - }, - }) - if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Error creating namespace %v: %v", ns, err) - } - } - return err -} - -func (k *kubernetes) DeleteNamespace(ns string) error { - err := k.client.Delete(&client.Resource{ - Kind: "namespace", - Name: ns, - }) - if err != nil { - if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("Error deleting namespace %v: %v", ns, err) - } - } - } - return err -} - -// transformStatus takes a deployment status (deploymentcondition.type) and transforms it into a -// runtime service status, e.g. containercreating => starting -func transformStatus(depStatus string) runtime.ServiceStatus { - switch strings.ToLower(depStatus) { - case "pending": - return runtime.Starting - case "containercreating": - return runtime.Starting - case "imagepullbackoff": - return runtime.Error - case "crashloopbackoff": - return runtime.Error - case "error": - return runtime.Error - case "running": - return runtime.Running - case "available": - return runtime.Running - case "succeeded": - return runtime.Stopped - case "failed": - return runtime.Error - case "waiting": - return runtime.Starting - case "terminated": - return runtime.Stopped - default: - return runtime.Unknown - } -} diff --git a/runtime/kubernetes/namespace.go b/runtime/kubernetes/namespace.go new file mode 100644 index 00000000..9a57a633 --- /dev/null +++ b/runtime/kubernetes/namespace.go @@ -0,0 +1,112 @@ +package kubernetes + +import ( + "strings" + + "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/util/kubernetes/client" +) + +func (k *kubernetes) ensureNamepaceExists(ns string) error { + namespace := client.Format(ns) + if namespace == client.DefaultNamespace { + return nil + } + + exist, err := k.namespaceExists(namespace) + if err == nil && exist { + return nil + } + if err != nil { + if logger.V(logger.WarnLevel, logger.DefaultLogger) { + logger.Warnf("Error checking namespace %v exists: %v", namespace, err) + } + return err + } + + if err := k.createNamespace(namespace); err != nil { + if logger.V(logger.WarnLevel, logger.DefaultLogger) { + logger.Warnf("Error creating namespace %v: %v", namespace, err) + } + return err + } + + return nil +} + +// namespaceExists returns a boolean indicating if a namespace exists +func (k *kubernetes) namespaceExists(name string) (bool, error) { + // populate the cache + if k.namespaces == nil { + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Populating namespace cache") + } + + namespaceList := new(client.NamespaceList) + resource := &client.Resource{Kind: "namespace", Value: namespaceList} + if err := k.client.List(resource); err != nil { + return false, err + } + + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("Popualted namespace cache successfully with %v items", len(namespaceList.Items)) + } + k.namespaces = namespaceList.Items + } + + // check if the namespace exists in the cache + for _, n := range k.namespaces { + if n.Metadata.Name == name { + return true, nil + } + } + + return false, nil +} + +// createNamespace creates a new k8s namespace +func (k *kubernetes) createNamespace(namespace string) error { + ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}} + err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns}) + + // ignore err already exists + if err != nil && strings.Contains(err.Error(), "already exists") { + logger.Debugf("Ignoring ErrAlreadyExists for namespace %v: %v", namespace, err) + err = nil + } + + // add to cache + if err == nil && k.namespaces != nil { + k.namespaces = append(k.namespaces, ns) + } + + return err +} + +func (k *kubernetes) CreateNamespace(ns string) error { + err := k.client.Create(&client.Resource{ + Kind: "namespace", + Value: client.Namespace{ + Metadata: &client.Metadata{ + Name: ns, + }, + }, + }) + if err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error creating namespace %v: %v", ns, err) + } + } + return err +} + +func (k *kubernetes) DeleteNamespace(ns string) error { + err := k.client.Delete(&client.Resource{ + Kind: "namespace", + Name: ns, + }) + if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error deleting namespace %v: %v", ns, err) + } + return err +} diff --git a/runtime/kubernetes/service.go b/runtime/kubernetes/service.go deleted file mode 100644 index 29886d15..00000000 --- a/runtime/kubernetes/service.go +++ /dev/null @@ -1,241 +0,0 @@ -package kubernetes - -import ( - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/micro/go-micro/v3/logger" - "github.com/micro/go-micro/v3/runtime" - "github.com/micro/go-micro/v3/util/kubernetes/api" - "github.com/micro/go-micro/v3/util/kubernetes/client" -) - -type service struct { - // service to manage - *runtime.Service - // Kubernetes service - kservice *client.Service - // Kubernetes deployment - kdeploy *client.Deployment -} - -func parseError(err error) *api.Status { - status := new(api.Status) - json.Unmarshal([]byte(err.Error()), &status) - return status -} - -func newService(s *runtime.Service, c runtime.CreateOptions) *service { - // use pre-formatted name/version - name := client.Format(s.Name) - version := client.Format(s.Version) - - kservice := client.NewService(name, version, c.Type, c.Namespace) - kdeploy := client.NewDeployment(name, version, c.Type, c.Namespace) - - // ensure the metadata is set - if kdeploy.Spec.Template.Metadata.Annotations == nil { - kdeploy.Spec.Template.Metadata.Annotations = make(map[string]string) - } - - // create if non existent - if s.Metadata == nil { - s.Metadata = make(map[string]string) - } - - // add the service metadata to the k8s labels, do this first so we - // don't override any labels used by the runtime, e.g. name - for k, v := range s.Metadata { - kdeploy.Metadata.Annotations[k] = v - } - - // attach our values to the deployment; name, version, source - kdeploy.Metadata.Annotations["name"] = s.Name - kdeploy.Metadata.Annotations["version"] = s.Version - kdeploy.Metadata.Annotations["source"] = s.Source - - // associate owner:group to be later augmented - kdeploy.Metadata.Annotations["owner"] = "micro" - kdeploy.Metadata.Annotations["group"] = "micro" - - // update the deployment is a custom source is provided - if len(c.Image) > 0 { - for i := range kdeploy.Spec.Template.PodSpec.Containers { - kdeploy.Spec.Template.PodSpec.Containers[i].Image = c.Image - kdeploy.Spec.Template.PodSpec.Containers[i].Command = []string{} - kdeploy.Spec.Template.PodSpec.Containers[i].Args = []string{} - } - } - - // define the environment values used by the container - env := make([]client.EnvVar, 0, len(c.Env)) - for _, evar := range c.Env { - evarPair := strings.Split(evar, "=") - env = append(env, client.EnvVar{Name: evarPair[0], Value: evarPair[1]}) - } - - // if secrets were provided, pass them to the service - for key := range c.Secrets { - env = append(env, client.EnvVar{ - Name: key, - ValueFrom: &client.EnvVarSource{ - SecretKeyRef: &client.SecretKeySelector{ - Name: credentialsName(s), - Key: key, - }, - }, - }) - } - - // if environment has been supplied update deployment default environment - if len(env) > 0 { - kdeploy.Spec.Template.PodSpec.Containers[0].Env = append(kdeploy.Spec.Template.PodSpec.Containers[0].Env, env...) - } - - // set the command if specified - if len(c.Command) > 0 { - kdeploy.Spec.Template.PodSpec.Containers[0].Command = c.Command - } - - if len(c.Args) > 0 { - kdeploy.Spec.Template.PodSpec.Containers[0].Args = c.Args - } - - // apply resource limits - if c.Resources != nil { - resLimits := &client.ResourceLimits{} - if c.Resources.CPU > 0 { - resLimits.CPU = fmt.Sprintf("%vm", c.Resources.CPU) - } - if c.Resources.Mem > 0 { - resLimits.Memory = fmt.Sprintf("%vMi", c.Resources.Mem) - } - if c.Resources.Disk > 0 { - resLimits.EphemeralStorage = fmt.Sprintf("%vMi", c.Resources.Disk) - } - - kdeploy.Spec.Template.PodSpec.Containers[0].Resources = &client.ResourceRequirements{Limits: resLimits} - } - - // mount volumes - var volumes []client.Volume - var mounts []client.VolumeMount - for name, path := range c.Volumes { - volumes = append(volumes, client.Volume{ - Name: name, - PersistentVolumeClaim: client.PersistentVolumeClaimVolumeSource{ClaimName: name}, - }) - - mounts = append(mounts, client.VolumeMount{Name: name, MountPath: path}) - } - kdeploy.Spec.Template.PodSpec.Volumes = volumes - kdeploy.Spec.Template.PodSpec.Containers[0].VolumeMounts = mounts - - return &service{ - Service: s, - kservice: kservice, - kdeploy: kdeploy, - } -} - -func deploymentResource(d *client.Deployment) *client.Resource { - return &client.Resource{ - Name: d.Metadata.Name, - Kind: "deployment", - Value: d, - } -} - -func serviceResource(s *client.Service) *client.Resource { - return &client.Resource{ - Name: s.Metadata.Name, - Kind: "service", - Value: s, - } -} - -// Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects -func (s *service) Start(k client.Client, opts ...client.CreateOption) error { - // create deployment first; if we fail, we dont create service - if err := k.Create(deploymentResource(s.kdeploy), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to create deployment: %v", err) - } - s.Status(runtime.Error, err) - v := parseError(err) - if v.Reason == "AlreadyExists" { - return runtime.ErrAlreadyExists - } - return err - } - // create service now that the deployment has been created - if err := k.Create(serviceResource(s.kservice), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to create service: %v", err) - } - s.Status(runtime.Error, err) - v := parseError(err) - if v.Reason == "AlreadyExists" { - return runtime.ErrAlreadyExists - } - return err - } - - s.Status(runtime.Running, nil) - - return nil -} - -func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error { - // first attempt to delete service - if err := k.Delete(serviceResource(s.kservice), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to delete service: %v", err) - } - s.Status(runtime.Error, err) - return err - } - // delete deployment once the service has been deleted - if err := k.Delete(deploymentResource(s.kdeploy), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to delete deployment: %v", err) - } - s.Status(runtime.Error, err) - return err - } - - s.Status(runtime.Stopped, nil) - - return nil -} - -func (s *service) Update(k client.Client, opts ...client.UpdateOption) error { - if err := k.Update(deploymentResource(s.kdeploy), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to update deployment: %v", err) - } - s.Status(runtime.Error, err) - return err - } - if err := k.Update(serviceResource(s.kservice), opts...); err != nil { - if logger.V(logger.DebugLevel, logger.DefaultLogger) { - logger.Debugf("Runtime failed to update service: %v", err) - } - return err - } - - return nil -} - -func (s *service) Status(status runtime.ServiceStatus, err error) { - s.Service.Status = status - s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339) - - if err == nil { - delete(s.Metadata, "error") - return - } - s.Metadata["error"] = err.Error() -} diff --git a/runtime/kubernetes/util.go b/runtime/kubernetes/util.go new file mode 100644 index 00000000..9f1ed54e --- /dev/null +++ b/runtime/kubernetes/util.go @@ -0,0 +1,206 @@ +package kubernetes + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" + + "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/runtime" + "github.com/micro/go-micro/v3/util/kubernetes/api" + "github.com/micro/go-micro/v3/util/kubernetes/client" +) + +// getServices queries kubernetes for services. It gets information from both the pods and the +// deployments +func (k *kubernetes) getServices(opts ...client.GetOption) ([]*runtime.Service, error) { + // get the deployments + depList := new(client.DeploymentList) + d := &client.Resource{ + Kind: "deployment", + Value: depList, + } + if err := k.client.Get(d, opts...); err != nil { + return nil, err + } + + srvMap := make(map[string]*runtime.Service, len(depList.Items)) + + // loop through the services and create a deployment for each + for _, kdep := range depList.Items { + srv := &runtime.Service{ + Name: kdep.Metadata.Labels["name"], + Version: kdep.Metadata.Labels["version"], + Source: kdep.Metadata.Labels["source"], + Metadata: kdep.Metadata.Annotations, + } + + // this metadata was injected by the k8s runtime + delete(srv.Metadata, "name") + delete(srv.Metadata, "version") + delete(srv.Metadata, "source") + + // parse out deployment status and inject into service metadata + if len(kdep.Status.Conditions) > 0 { + srv.Status = transformStatus(kdep.Status.Conditions[0].Type) + srv.Metadata["started"] = kdep.Status.Conditions[0].LastUpdateTime + } else { + srv.Status = runtime.Unknown + } + + srvMap[resourceName(srv)] = srv + } + + // get the pods from k8s + podList := new(client.PodList) + p := &client.Resource{ + Kind: "pod", + Value: podList, + } + if err := k.client.Get(p, opts...); err != nil { + logger.Errorf("Error fetching pods: %v", err) + return nil, nil + } + + for _, item := range podList.Items { + // skip if we can't get the container + if len(item.Status.Containers) == 0 { + continue + } + + // lookup the service in the map + key := resourceName(&runtime.Service{ + Name: item.Metadata.Labels["name"], + Version: item.Metadata.Labels["version"], + }) + srv, ok := srvMap[key] + if !ok { + continue + } + + // use the pod status over the deployment status (contains more details) + srv.Status = transformStatus(item.Status.Phase) + + // set start time + state := item.Status.Containers[0].State + if state.Running != nil { + srv.Metadata["started"] = state.Running.Started + } + + // set status from waiting + if v := state.Waiting; v != nil { + srv.Status = runtime.Pending + } + } + + // turn the map into an array + services := make([]*runtime.Service, 0, len(srvMap)) + for _, srv := range srvMap { + services = append(services, srv) + } + return services, nil +} + +func (k *kubernetes) createCredentials(service *runtime.Service, options *runtime.CreateOptions) error { + if len(options.Secrets) == 0 { + return nil + } + + data := make(map[string]string, len(options.Secrets)) + for key, value := range options.Secrets { + data[key] = base64.StdEncoding.EncodeToString([]byte(value)) + } + + // construct the k8s secret object + secret := &client.Secret{ + Type: "Opaque", + Data: data, + Metadata: &client.Metadata{ + Name: resourceName(service), + Namespace: options.Namespace, + }, + } + + // crete the secret in kubernetes + err := k.client.Create(&client.Resource{ + Kind: "secret", + Name: resourceName(service), + Value: secret, + }, client.CreateNamespace(options.Namespace)) + + // ignore the error if the creds already exist + if err == nil || parseError(err).Reason == "AlreadyExists" { + return nil + } + + if logger.V(logger.WarnLevel, logger.DefaultLogger) { + logger.Warnf("Error generating auth credentials for service: %v", err) + } + return err +} + +func (k *kubernetes) deleteCredentials(service *runtime.Service, options *runtime.CreateOptions) error { + // construct the k8s secret object + secret := &client.Secret{ + Type: "Opaque", + Metadata: &client.Metadata{ + Name: resourceName(service), + Namespace: options.Namespace, + }, + } + + // crete the secret in kubernetes + err := k.client.Delete(&client.Resource{ + Kind: "secret", + Name: resourceName(service), + Value: secret, + }, client.DeleteNamespace(options.Namespace)) + + if err != nil && logger.V(logger.WarnLevel, logger.DefaultLogger) { + logger.Warnf("Error deleting auth credentials for service: %v", err) + } + + return err +} + +func resourceName(srv *runtime.Service) string { + return fmt.Sprintf("%v-%v", client.Format(srv.Name), client.Format(srv.Version)) +} + +// transformStatus takes a deployment status (deploymentcondition.type) and transforms it into a +// runtime service status, e.g. containercreating => starting +func transformStatus(depStatus string) runtime.ServiceStatus { + switch strings.ToLower(depStatus) { + case "pending": + return runtime.Pending + case "containercreating": + return runtime.Starting + case "imagepullbackoff": + return runtime.Error + case "crashloopbackoff": + return runtime.Error + case "error": + return runtime.Error + case "running": + return runtime.Running + case "available": + return runtime.Running + case "succeeded": + return runtime.Stopped + case "failed": + return runtime.Error + case "waiting": + return runtime.Pending + case "terminated": + return runtime.Stopped + default: + return runtime.Unknown + } +} + +func parseError(err error) *api.Status { + status := new(api.Status) + json.Unmarshal([]byte(err.Error()), &status) + return status +} diff --git a/runtime/options.go b/runtime/options.go index 3ec72aec..d460460c 100644 --- a/runtime/options.go +++ b/runtime/options.go @@ -11,14 +11,14 @@ type Option func(o *Options) // Options configure runtime type Options struct { + // Service type to manage + Type string // Client to use when making requests Client client.Client // Base image to use Image string // Source of the services repository Source string - // Service type to manage - Type string } // WithSource sets the base image / repository @@ -71,6 +71,8 @@ type CreateOptions struct { Retries int // Specify the image to use Image string + // Port to expose + Port string // Namespace to create the service in Namespace string // Specify the context to use @@ -81,6 +83,8 @@ type CreateOptions struct { Resources *Resources // Volumes to mount Volumes map[string]string + // ServiceAccount to start the container with + ServiceAccount string } // ReadOptions queries runtime services @@ -132,6 +136,13 @@ func CreateEntrypoint(e string) CreateOption { } } +// WithServiceAccount sets the ServiceAccount +func WithServiceAccount(s string) CreateOption { + return func(o *CreateOptions) { + o.ServiceAccount = s + } +} + // WithSecret sets a secret to provide the service with func WithSecret(key, value string) CreateOption { return func(o *CreateOptions) { @@ -191,6 +202,13 @@ func WithVolume(name, path string) CreateOption { } } +// WithPort sets the port to expose +func WithPort(p string) CreateOption { + return func(o *CreateOptions) { + o.Port = p + } +} + // ResourceLimits sets the resources for the service to use func ResourceLimits(r *Resources) CreateOption { return func(o *CreateOptions) { diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index 707f7ac6..261f90fc 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -5,15 +5,18 @@ import ( "bytes" "crypto/tls" "errors" + "fmt" "io" "io/ioutil" "net/http" "os" "path" "regexp" + "strconv" "strings" "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/runtime" "github.com/micro/go-micro/v3/util/kubernetes/api" ) @@ -26,6 +29,8 @@ var ( DefaultImage = "micro/go-micro" // DefaultNamespace is the default k8s namespace DefaultNamespace = "default" + // DefaultPort to expose on a service + DefaultPort = 8080 ) // Client ... @@ -79,12 +84,6 @@ var ( nameRegex = regexp.MustCompile("[^a-zA-Z0-9]+") ) -// SerializeResourceName removes all spacial chars from a string so it -// can be used as a k8s resource name -func SerializeResourceName(ns string) string { - return nameRegex.ReplaceAllString(ns, "-") -} - // Get queries API objects and stores the result in r func (c *client) Get(r *Resource, opts ...GetOption) error { options := GetOptions{ @@ -226,118 +225,154 @@ func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) { } // NewService returns default micro kubernetes service definition -func NewService(name, version, typ, namespace string) *Service { - if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Tracef("kubernetes default service: name: %s, version: %s", name, version) +func NewService(s *runtime.Service, opts *runtime.CreateOptions) *Resource { + labels := map[string]string{ + "name": Format(s.Name), + "version": Format(s.Version), + "micro": Format(opts.Type), } - Labels := map[string]string{ - "name": name, - "version": version, - "micro": typ, + metadata := &Metadata{ + Name: Format(s.Name), + Namespace: Format(opts.Namespace), + Version: Format(s.Version), + Labels: labels, } - svcName := name - if len(version) > 0 { - // API service object name joins name and version over "-" - svcName = strings.Join([]string{name, version}, "-") + port := DefaultPort + if len(opts.Port) > 0 { + port, _ = strconv.Atoi(opts.Port) } - if len(namespace) == 0 { - namespace = DefaultNamespace - } - - Metadata := &Metadata{ - Name: svcName, - Namespace: SerializeResourceName(namespace), - Version: version, - Labels: Labels, - } - - Spec := &ServiceSpec{ - Type: "ClusterIP", - Selector: Labels, - Ports: []ServicePort{{ - "service-port", 8080, "", - }}, - } - - return &Service{ - Metadata: Metadata, - Spec: Spec, - } -} - -// NewService returns default micro kubernetes deployment definition -func NewDeployment(name, version, typ, namespace string) *Deployment { - if logger.V(logger.TraceLevel, logger.DefaultLogger) { - logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version) - } - - Labels := map[string]string{"name": name} - if len(typ) > 0 { - Labels["micro"] = typ - } - if len(version) > 0 { - Labels["version"] = version - } - - depName := name - if len(version) > 0 { - // API deployment object name joins name and version over "-" - depName = strings.Join([]string{name, version}, "-") - } - - if len(namespace) == 0 { - namespace = DefaultNamespace - } - - Metadata := &Metadata{ - Name: depName, - Namespace: SerializeResourceName(namespace), - Version: version, - Labels: Labels, - Annotations: map[string]string{}, - } - - // enable go modules by default - env := EnvVar{ - Name: "GO111MODULE", - Value: "on", - } - - Spec := &DeploymentSpec{ - Replicas: 1, - Selector: &LabelSelector{ - MatchLabels: Labels, - }, - Template: &Template{ - Metadata: Metadata, - PodSpec: &PodSpec{ - Containers: []Container{{ - Name: name, - Image: DefaultImage, - Env: []EnvVar{env}, - Command: []string{}, - Ports: []ContainerPort{{ - Name: "service-port", - ContainerPort: 8080, - }}, - ReadinessProbe: &Probe{ - TCPSocket: &TCPSocketAction{ - Port: 8080, - }, - PeriodSeconds: 10, - InitialDelaySeconds: 10, - }, + return &Resource{ + Kind: "service", + Name: metadata.Name, + Value: &Service{ + Metadata: metadata, + Spec: &ServiceSpec{ + Type: "ClusterIP", + Selector: labels, + Ports: []ServicePort{{ + "service-port", port, "", }}, }, }, } +} - return &Deployment{ - Metadata: Metadata, - Spec: Spec, +// NewDeployment returns default micro kubernetes deployment definition +func NewDeployment(s *runtime.Service, opts *runtime.CreateOptions) *Resource { + labels := map[string]string{ + "name": Format(s.Name), + "version": Format(s.Version), + "micro": Format(opts.Type), + } + + // attach our values to the deployment; name, version, source + annotations := map[string]string{ + "name": s.Name, + "version": s.Version, + "source": s.Source, + } + for k, v := range s.Metadata { + annotations[k] = v + } + + // construct the metadata for the deployment + metadata := &Metadata{ + Name: fmt.Sprintf("%v-%v", Format(s.Name), Format(s.Version)), + Namespace: Format(opts.Namespace), + Version: Format(s.Version), + Labels: labels, + Annotations: annotations, + } + + // set the image + image := opts.Image + if len(image) == 0 { + image = DefaultImage + } + + // pass the env vars + env := make([]EnvVar, 0, len(opts.Env)) + for _, evar := range opts.Env { + if comps := strings.Split(evar, "="); len(comps) == 2 { + env = append(env, EnvVar{Name: comps[0], Value: comps[1]}) + } + } + + // pass the secrets + for key := range opts.Secrets { + env = append(env, EnvVar{ + Name: key, + ValueFrom: &EnvVarSource{ + SecretKeyRef: &SecretKeySelector{ + Name: metadata.Name, + Key: key, + }, + }, + }) + } + + // parse resource limits + var resReqs *ResourceRequirements + if opts.Resources != nil { + resReqs = &ResourceRequirements{Limits: &ResourceLimits{}} + + if opts.Resources.CPU > 0 { + resReqs.Limits.CPU = fmt.Sprintf("%vm", opts.Resources.CPU) + } + if opts.Resources.Mem > 0 { + resReqs.Limits.Memory = fmt.Sprintf("%vMi", opts.Resources.Mem) + } + if opts.Resources.Disk > 0 { + resReqs.Limits.EphemeralStorage = fmt.Sprintf("%vMi", opts.Resources.Disk) + } + } + + // parse the port option + port := DefaultPort + if len(opts.Port) > 0 { + port, _ = strconv.Atoi(opts.Port) + } + + return &Resource{ + Kind: "deployment", + Name: metadata.Name, + Value: &Deployment{ + Metadata: metadata, + Spec: &DeploymentSpec{ + Replicas: 1, + Selector: &LabelSelector{ + MatchLabels: labels, + }, + Template: &Template{ + Metadata: metadata, + PodSpec: &PodSpec{ + ServiceAccountName: opts.ServiceAccount, + Containers: []Container{{ + Name: Format(s.Name), + Image: image, + Env: env, + Command: opts.Command, + Args: opts.Args, + Ports: []ContainerPort{{ + Name: "service-port", + ContainerPort: port, + }}, + ReadinessProbe: &Probe{ + TCPSocket: &TCPSocketAction{ + Port: port, + }, + PeriodSeconds: 10, + InitialDelaySeconds: 10, + }, + Resources: resReqs, + }}, + }, + }, + }, + }, } } diff --git a/util/kubernetes/client/options.go b/util/kubernetes/client/options.go index dd6b32a5..6ec7be5f 100644 --- a/util/kubernetes/client/options.go +++ b/util/kubernetes/client/options.go @@ -53,14 +53,14 @@ func WatchParams(p map[string]string) WatchOption { // CreateNamespace sets the namespace for creating a resource func CreateNamespace(ns string) CreateOption { return func(o *CreateOptions) { - o.Namespace = SerializeResourceName(ns) + o.Namespace = Format(ns) } } // GetNamespace sets the namespace for getting a resource func GetNamespace(ns string) GetOption { return func(o *GetOptions) { - o.Namespace = SerializeResourceName(ns) + o.Namespace = Format(ns) } } @@ -74,34 +74,34 @@ func GetLabels(ls map[string]string) GetOption { // UpdateNamespace sets the namespace for updating a resource func UpdateNamespace(ns string) UpdateOption { return func(o *UpdateOptions) { - o.Namespace = SerializeResourceName(ns) + o.Namespace = Format(ns) } } // DeleteNamespace sets the namespace for deleting a resource func DeleteNamespace(ns string) DeleteOption { return func(o *DeleteOptions) { - o.Namespace = SerializeResourceName(ns) + o.Namespace = Format(ns) } } // ListNamespace sets the namespace for listing resources func ListNamespace(ns string) ListOption { return func(o *ListOptions) { - o.Namespace = SerializeResourceName(ns) + o.Namespace = Format(ns) } } // LogNamespace sets the namespace for logging a resource func LogNamespace(ns string) LogOption { return func(o *LogOptions) { - o.Namespace = SerializeResourceName(ns) + o.Namespace = Format(ns) } } // WatchNamespace sets the namespace for watching a resource func WatchNamespace(ns string) WatchOption { return func(o *WatchOptions) { - o.Namespace = SerializeResourceName(ns) + o.Namespace = Format(ns) } } diff --git a/util/kubernetes/client/templates.go b/util/kubernetes/client/templates.go index d2b4f6f6..aeb48618 100644 --- a/util/kubernetes/client/templates.go +++ b/util/kubernetes/client/templates.go @@ -84,7 +84,7 @@ spec: - {{.}} {{- end }} image: {{ .Image }} - imagePullPolicy: Always + imagePullPolicy: IfNotPresent ports: {{- with .Ports }} {{- range . }} diff --git a/util/kubernetes/client/util_test.go b/util/kubernetes/client/util_test.go index b7282f2d..60b783b1 100644 --- a/util/kubernetes/client/util_test.go +++ b/util/kubernetes/client/util_test.go @@ -3,23 +3,23 @@ package client import ( "bytes" "testing" + + "github.com/micro/go-micro/v3/runtime" ) func TestTemplates(t *testing.T) { - name := "foo" - version := "123" - typ := "service" - namespace := "default" + srv := &runtime.Service{Name: "foo", Version: "123"} + opts := &runtime.CreateOptions{Type: "service", Namespace: "default"} // Render default service - s := NewService(name, version, typ, namespace) + s := NewService(srv, opts) bs := new(bytes.Buffer) if err := renderTemplate(templates["service"], bs, s); err != nil { t.Errorf("Failed to render kubernetes service: %v", err) } // Render default deployment - d := NewDeployment(name, version, typ, namespace) + d := NewDeployment(srv, opts) bd := new(bytes.Buffer) if err := renderTemplate(templates["deployment"], bd, d); err != nil { t.Errorf("Failed to render kubernetes deployment: %v", err)