runtime/kubernetes: rewrite to improve support of multiple versions of a single service (#2035)

* wip: refactor kubernetes package

* runtime/kubernetes: fix invalid labels

* runtime/kubernetes: handle delete service not found error

* Misc Fixes

* runtime: add ServiceAccount option

* router/static: return noop table

* add kubernetes router

* runtime: add port option

* store/file: set directory

* store/file: pass options to blob store

* Revert changes to static router

* Fix merge error

* runtime/kubernetes: Debug => Error logs

* runtime/kubernetes: fix double if
This commit is contained in:
ben-toogood
2020-10-09 13:28:15 +01:00
committed by GitHub
parent c701f96a09
commit dad05be95e
9 changed files with 614 additions and 794 deletions

View File

@@ -2,9 +2,7 @@
package kubernetes
import (
"encoding/base64"
"fmt"
"strings"
"sync"
"time"
@@ -19,234 +17,20 @@ import (
type action int
type kubernetes struct {
sync.RWMutex
sync.Mutex
// options configure runtime
options runtime.Options
// indicates if we're running
running bool
// client is kubernetes client
client client.Client
// namespaces which exist
namespaces []client.Namespace
}
// namespaceExists returns a boolean indicating if a namespace exists
func (k *kubernetes) namespaceExists(name string) (bool, error) {
// populate the cache
if k.namespaces == nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Populating namespace cache")
}
namespaceList := new(client.NamespaceList)
resource := &client.Resource{Kind: "namespace", Value: namespaceList}
if err := k.client.List(resource); err != nil {
return false, err
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Popualted namespace cache successfully with %v items", len(namespaceList.Items))
}
k.namespaces = namespaceList.Items
}
// check if the namespace exists in the cache
for _, n := range k.namespaces {
if n.Metadata.Name == name {
return true, nil
}
}
return false, nil
}
// createNamespace creates a new k8s namespace
func (k *kubernetes) createNamespace(namespace string) error {
ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}}
err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns})
// ignore err already exists
if err != nil && strings.Contains(err.Error(), "already exists") {
logger.Debugf("Ignoring ErrAlreadyExists for namespace %v: %v", namespace, err)
err = nil
}
// add to cache
if err == nil && k.namespaces != nil {
k.namespaces = append(k.namespaces, ns)
}
return err
}
// getService queries kubernetes for micro service
// NOTE: this function is not thread-safe
func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOption) ([]*service, error) {
// get the service status
serviceList := new(client.ServiceList)
r := &client.Resource{
Kind: "service",
Value: serviceList,
}
opts = append(opts, client.GetLabels(labels))
// get the service from k8s
if err := k.client.Get(r, opts...); err != nil {
return nil, err
}
// get the deployment status
depList := new(client.DeploymentList)
d := &client.Resource{
Kind: "deployment",
Value: depList,
}
if err := k.client.Get(d, opts...); err != nil {
return nil, err
}
// get the pods from k8s
podList := new(client.PodList)
p := &client.Resource{
Kind: "pod",
Value: podList,
}
if err := k.client.Get(p, opts...); err != nil {
return nil, err
}
// service map
svcMap := make(map[string]*service)
// collect info from kubernetes service
for _, kservice := range serviceList.Items {
// name of the service
name := kservice.Metadata.Labels["name"]
// version of the service
version := kservice.Metadata.Labels["version"]
srv := &service{
Service: &runtime.Service{
Name: name,
Version: version,
Metadata: make(map[string]string),
},
kservice: &kservice,
}
// set the address
address := kservice.Spec.ClusterIP
port := kservice.Spec.Ports[0]
srv.Service.Metadata["address"] = fmt.Sprintf("%s:%d", address, port.Port)
// set the type of service
srv.Service.Metadata["type"] = kservice.Metadata.Labels["micro"]
// copy annotations metadata into service metadata
for k, v := range kservice.Metadata.Annotations {
srv.Service.Metadata[k] = v
}
// save as service
svcMap[name+version] = srv
}
// collect additional info from kubernetes deployment
for _, kdep := range depList.Items {
// name of the service
name := kdep.Metadata.Labels["name"]
// versio of the service
version := kdep.Metadata.Labels["version"]
// access existing service map based on name + version
if svc, ok := svcMap[name+version]; ok {
// we're expecting our own service name in metadata
if _, ok := kdep.Metadata.Annotations["name"]; !ok {
continue
}
// set the service name, version and source
// based on existing annotations we stored
svc.Service.Name = kdep.Metadata.Annotations["name"]
svc.Service.Version = kdep.Metadata.Annotations["version"]
svc.Service.Source = kdep.Metadata.Annotations["source"]
// delete from metadata
delete(kdep.Metadata.Annotations, "name")
delete(kdep.Metadata.Annotations, "version")
delete(kdep.Metadata.Annotations, "source")
// copy all annotations metadata into service metadata
for k, v := range kdep.Metadata.Annotations {
svc.Service.Metadata[k] = v
}
// parse out deployment status and inject into service metadata
if len(kdep.Status.Conditions) > 0 {
status := transformStatus(kdep.Status.Conditions[0].Type)
svc.Status(status, nil)
svc.Metadata["started"] = kdep.Status.Conditions[0].LastUpdateTime
} else {
svc.Status(runtime.Unknown, nil)
}
// get the real status
for _, item := range podList.Items {
// check the name
if item.Metadata.Labels["name"] != name {
continue
}
// check the version
if item.Metadata.Labels["version"] != version {
continue
}
status := transformStatus(item.Status.Phase)
// skip if we can't get the container
if len(item.Status.Containers) == 0 {
continue
}
// now try get a deeper status
state := item.Status.Containers[0].State
// set start time
if state.Running != nil {
svc.Metadata["started"] = state.Running.Started
}
// set status from waiting
if v := state.Waiting; v != nil {
status = runtime.Starting
}
svc.Status(status, nil)
}
// save deployment
svc.kdeploy = &kdep
}
}
// collect all the services and return
services := make([]*service, 0, len(serviceList.Items))
for _, service := range svcMap {
services = append(services, service)
}
return services, nil
}
// Init initializes runtime options
func (k *kubernetes) Init(opts ...runtime.Option) error {
k.Lock()
defer k.Unlock()
for _, o := range opts {
o(&k.options)
}
return nil
}
@@ -313,64 +97,57 @@ func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) er
k.Lock()
defer k.Unlock()
options := runtime.CreateOptions{
// parse the options
options := &runtime.CreateOptions{
Type: k.options.Type,
Image: k.options.Image,
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
o(options)
}
// default type if it doesn't exist
if len(options.Type) == 0 {
options.Type = k.options.Type
}
// default the source if it doesn't exist
// default the service's source and version
if len(s.Source) == 0 {
s.Source = k.options.Source
}
if len(s.Version) == 0 {
s.Version = "latest"
}
// ensure the namespace exists
namespace := client.SerializeResourceName(options.Namespace)
// only do this if the namespace is not default
if namespace != "default" {
if exist, err := k.namespaceExists(namespace); err == nil && !exist {
if err := k.createNamespace(namespace); err != nil {
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
logger.Warnf("Error creating namespace %v: %v", namespace, err)
}
return err
}
} else if err != nil {
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
logger.Warnf("Error checking namespace %v exists: %v", namespace, err)
}
return err
}
}
// determine the image from the source and options
options.Image = k.getImage(s, options)
// create a secret for the credentials if some where provided
if len(options.Secrets) > 0 {
if err := k.createCredentials(s, options); err != nil {
if logger.V(logger.WarnLevel, logger.DefaultLogger) {
logger.Warnf("Error generating auth credentials for service: %v", err)
}
return err
}
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Generated auth credentials for service %v", s.Name)
}
if err := k.ensureNamepaceExists(options.Namespace); err != nil {
return nil
}
// create new service
service := newService(s, options)
// create a secret for the deployment
if err := k.createCredentials(s, options); err != nil {
return err
}
// start the service
return service.Start(k.client, client.CreateNamespace(options.Namespace))
// create the deployment
if err := k.client.Create(client.NewDeployment(s, options), client.CreateNamespace(options.Namespace)); err != nil {
if parseError(err).Reason == "AlreadyExists" {
return runtime.ErrAlreadyExists
}
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Runtime failed to create deployment: %v", err)
}
return err
}
// create the service, one could already exist for another version so ignore ErrAlreadyExists
if err := k.client.Create(client.NewService(s, options), client.CreateNamespace(options.Namespace)); err != nil {
if parseError(err).Reason == "AlreadyExists" {
return nil
}
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Runtime failed to create service: %v", err)
}
return err
}
return nil
}
// Read returns all instances of given service
@@ -378,88 +155,94 @@ func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error
k.Lock()
defer k.Unlock()
// set the default labels
labels := map[string]string{}
// parse the options
options := runtime.ReadOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
// construct the query
labels := map[string]string{}
if len(options.Service) > 0 {
labels["name"] = client.Format(options.Service)
}
// add version to labels if a version has been supplied
if len(options.Version) > 0 {
labels["version"] = client.Format(options.Version)
}
if len(options.Type) > 0 {
labels["micro"] = options.Type
labels["micro"] = client.Format(options.Type)
}
srvs, err := k.getService(labels, client.GetNamespace(options.Namespace))
if err != nil {
return nil, err
}
var services []*runtime.Service
for _, service := range srvs {
services = append(services, service.Service)
}
return services, nil
// lookup all the serivces which match this query, if one service has two different versions,
// they'll be returned as two seperate resullts
return k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels))
}
// Update the service in place
func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error {
k.Lock()
defer k.Unlock()
// parse the options
options := runtime.UpdateOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
// construct the query
labels := map[string]string{}
if len(s.Name) > 0 {
labels["name"] = client.Format(s.Name)
}
if len(s.Version) > 0 {
labels["version"] = client.Format(s.Version)
}
// get the existing service
services, err := k.getService(labels, client.GetNamespace(options.Namespace))
if err != nil {
// get the existing deployments
depList := new(client.DeploymentList)
d := &client.Resource{
Kind: "deployment",
Value: depList,
}
depOpts := []client.GetOption{
client.GetNamespace(options.Namespace),
client.GetLabels(labels),
}
if err := k.client.Get(d, depOpts...); err != nil {
return err
} else if len(depList.Items) == 0 {
return runtime.ErrNotFound
}
// update the relevant services
for _, service := range services {
// nil check
if service.kdeploy.Metadata == nil || service.kdeploy.Metadata.Annotations == nil {
md := new(client.Metadata)
md.Annotations = make(map[string]string)
service.kdeploy.Metadata = md
// update the deployments which match the query
for _, dep := range depList.Items {
// the service wan't created by the k8s runtime
if dep.Metadata == nil || dep.Metadata.Annotations == nil {
continue
}
// update metadata
for k, v := range s.Metadata {
service.kdeploy.Metadata.Annotations[k] = v
dep.Metadata.Annotations[k] = v
}
// update build time annotation
service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix())
dep.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix())
// update the service
if err := service.Update(k.client, client.UpdateNamespace(options.Namespace)); err != nil {
// update the deployment
res := &client.Resource{
Kind: "deployment",
Name: resourceName(s),
Value: &dep,
}
if err := k.client.Update(res, client.UpdateNamespace(options.Namespace)); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Runtime failed to update deployment: %v", err)
}
return err
}
}
@@ -469,6 +252,10 @@ func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) er
// Delete removes a service
func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error {
k.Lock()
defer k.Unlock()
// parse the options
options := runtime.DeleteOptions{
Namespace: client.DefaultNamespace,
}
@@ -476,22 +263,47 @@ func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) er
o(&options)
}
k.Lock()
defer k.Unlock()
// create new kubernetes micro service
service := newService(s, runtime.CreateOptions{
// delete the deployment
dep := client.NewDeployment(s, &runtime.CreateOptions{
Type: k.options.Type,
Namespace: options.Namespace,
})
if err := k.client.Delete(dep, client.DeleteNamespace(options.Namespace)); err != nil {
if err == api.ErrNotFound {
return runtime.ErrNotFound
}
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Runtime failed to delete deployment: %v", err)
}
return err
}
// delete the service credentials
ns := client.DeleteNamespace(options.Namespace)
k.client.Delete(&client.Resource{Name: credentialsName(s), Kind: "secret"}, ns)
// delete the credentials
if err := k.deleteCredentials(s, &runtime.CreateOptions{Namespace: options.Namespace}); err != nil {
return err
}
if err := service.Stop(k.client, ns); err == api.ErrNotFound {
return runtime.ErrNotFound
} else if err != nil {
// if there are more deployments for this service, then don't delete it
labels := map[string]string{}
if len(s.Name) > 0 {
labels["name"] = client.Format(s.Name)
}
// get the existing services. todo: refactor to just get the deployments
services, err := k.getServices(client.GetNamespace(options.Namespace), client.GetLabels(labels))
if err != nil || len(services) > 0 {
return err
}
// delete the service
srv := client.NewService(s, &runtime.CreateOptions{
Type: k.options.Type,
Namespace: options.Namespace,
})
if err := k.client.Delete(srv, client.DeleteNamespace(options.Namespace)); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Runtime failed to delete service: %v", err)
}
return err
}
@@ -500,30 +312,11 @@ func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) er
// Start starts the runtime
func (k *kubernetes) Start() error {
k.Lock()
defer k.Unlock()
// already running
if k.running {
return nil
}
// set running
k.running = true
return nil
}
// Stop shuts down the runtime
func (k *kubernetes) Stop() error {
k.Lock()
defer k.Unlock()
if !k.running {
return nil
}
// set not running
k.running = false
return nil
}
@@ -553,106 +346,3 @@ func NewRuntime(opts ...runtime.Option) runtime.Runtime {
client: client,
}
}
func (k *kubernetes) getImage(s *runtime.Service, options runtime.CreateOptions) string {
// use the image when its specified
if len(options.Image) > 0 {
return options.Image
}
if len(k.options.Image) > 0 {
return k.options.Image
}
return ""
}
func (k *kubernetes) createCredentials(service *runtime.Service, options runtime.CreateOptions) error {
data := make(map[string]string, len(options.Secrets))
for key, value := range options.Secrets {
data[key] = base64.StdEncoding.EncodeToString([]byte(value))
}
// construct the k8s secret object
secret := &client.Secret{
Type: "Opaque",
Data: data,
Metadata: &client.Metadata{
Name: credentialsName(service),
Namespace: options.Namespace,
},
}
// crete the secret in kubernetes
name := credentialsName(service)
return k.client.Create(&client.Resource{
Kind: "secret", Name: name, Value: secret,
}, client.CreateNamespace(options.Namespace))
}
func credentialsName(service *runtime.Service) string {
name := fmt.Sprintf("%v-%v-credentials", service.Name, service.Version)
return client.SerializeResourceName(name)
}
func (k *kubernetes) CreateNamespace(ns string) error {
err := k.client.Create(&client.Resource{
Kind: "namespace",
Value: client.Namespace{
Metadata: &client.Metadata{
Name: ns,
},
},
})
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error creating namespace %v: %v", ns, err)
}
}
return err
}
func (k *kubernetes) DeleteNamespace(ns string) error {
err := k.client.Delete(&client.Resource{
Kind: "namespace",
Name: ns,
})
if err != nil {
if err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error deleting namespace %v: %v", ns, err)
}
}
}
return err
}
// transformStatus takes a deployment status (deploymentcondition.type) and transforms it into a
// runtime service status, e.g. containercreating => starting
func transformStatus(depStatus string) runtime.ServiceStatus {
switch strings.ToLower(depStatus) {
case "pending":
return runtime.Starting
case "containercreating":
return runtime.Starting
case "imagepullbackoff":
return runtime.Error
case "crashloopbackoff":
return runtime.Error
case "error":
return runtime.Error
case "running":
return runtime.Running
case "available":
return runtime.Running
case "succeeded":
return runtime.Stopped
case "failed":
return runtime.Error
case "waiting":
return runtime.Starting
case "terminated":
return runtime.Stopped
default:
return runtime.Unknown
}
}