436 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			436 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package client provides an implementation of a restricted subset of kubernetes API client
 | 
						|
package client
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"crypto/tls"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"io/ioutil"
 | 
						|
	"net/http"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"regexp"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/v3/logger"
 | 
						|
	"github.com/micro/go-micro/v3/runtime"
 | 
						|
	"github.com/micro/go-micro/v3/util/kubernetes/api"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	// path to kubernetes service account token
 | 
						|
	serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
 | 
						|
	// ErrReadNamespace is returned when the names could not be read from service account
 | 
						|
	ErrReadNamespace = errors.New("Could not read namespace from service account secret")
 | 
						|
	// DefaultImage is default micro image
 | 
						|
	DefaultImage = "micro/go-micro"
 | 
						|
	// DefaultNamespace is the default k8s namespace
 | 
						|
	DefaultNamespace = "default"
 | 
						|
	// DefaultPort to expose on a service
 | 
						|
	DefaultPort = 8080
 | 
						|
)
 | 
						|
 | 
						|
// Client ...
 | 
						|
type client struct {
 | 
						|
	opts *api.Options
 | 
						|
}
 | 
						|
 | 
						|
// Kubernetes client
 | 
						|
type Client interface {
 | 
						|
	// Create creates new API resource
 | 
						|
	Create(*Resource, ...CreateOption) error
 | 
						|
	// Get queries API resources
 | 
						|
	Get(*Resource, ...GetOption) error
 | 
						|
	// Update patches existing API object
 | 
						|
	Update(*Resource, ...UpdateOption) error
 | 
						|
	// Delete deletes API resource
 | 
						|
	Delete(*Resource, ...DeleteOption) error
 | 
						|
	// List lists API resources
 | 
						|
	List(*Resource, ...ListOption) error
 | 
						|
	// Log gets log for a pod
 | 
						|
	Log(*Resource, ...LogOption) (io.ReadCloser, error)
 | 
						|
	// Watch for events
 | 
						|
	Watch(*Resource, ...WatchOption) (Watcher, error)
 | 
						|
}
 | 
						|
 | 
						|
// Create creates new API object
 | 
						|
func (c *client) Create(r *Resource, opts ...CreateOption) error {
 | 
						|
	options := CreateOptions{
 | 
						|
		Namespace: c.opts.Namespace,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	b := new(bytes.Buffer)
 | 
						|
	if err := renderTemplate(r.Kind, b, r.Value); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	return api.NewRequest(c.opts).
 | 
						|
		Post().
 | 
						|
		SetHeader("Content-Type", "application/yaml").
 | 
						|
		Namespace(options.Namespace).
 | 
						|
		Resource(r.Kind).
 | 
						|
		Body(b).
 | 
						|
		Do().
 | 
						|
		Error()
 | 
						|
}
 | 
						|
 | 
						|
var (
 | 
						|
	nameRegex = regexp.MustCompile("[^a-zA-Z0-9]+")
 | 
						|
)
 | 
						|
 | 
						|
// Get queries API objects and stores the result in r
 | 
						|
func (c *client) Get(r *Resource, opts ...GetOption) error {
 | 
						|
	options := GetOptions{
 | 
						|
		Namespace: c.opts.Namespace,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	return api.NewRequest(c.opts).
 | 
						|
		Get().
 | 
						|
		Resource(r.Kind).
 | 
						|
		Namespace(options.Namespace).
 | 
						|
		Params(&api.Params{LabelSelector: options.Labels}).
 | 
						|
		Do().
 | 
						|
		Into(r.Value)
 | 
						|
}
 | 
						|
 | 
						|
// Log returns logs for a pod
 | 
						|
func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) {
 | 
						|
	options := LogOptions{
 | 
						|
		Namespace: c.opts.Namespace,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	req := api.NewRequest(c.opts).
 | 
						|
		Get().
 | 
						|
		Resource(r.Kind).
 | 
						|
		SubResource("log").
 | 
						|
		Name(r.Name).
 | 
						|
		Namespace(options.Namespace)
 | 
						|
 | 
						|
	if options.Params != nil {
 | 
						|
		req.Params(&api.Params{Additional: options.Params})
 | 
						|
	}
 | 
						|
 | 
						|
	resp, err := req.Raw()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
 | 
						|
		resp.Body.Close()
 | 
						|
		return nil, errors.New(resp.Request.URL.String() + ": " + resp.Status)
 | 
						|
	}
 | 
						|
	return resp.Body, nil
 | 
						|
}
 | 
						|
 | 
						|
// Update updates API object
 | 
						|
func (c *client) Update(r *Resource, opts ...UpdateOption) error {
 | 
						|
	options := UpdateOptions{
 | 
						|
		Namespace: c.opts.Namespace,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	req := api.NewRequest(c.opts).
 | 
						|
		Patch().
 | 
						|
		SetHeader("Content-Type", "application/strategic-merge-patch+json").
 | 
						|
		Resource(r.Kind).
 | 
						|
		Name(r.Name).
 | 
						|
		Namespace(options.Namespace)
 | 
						|
 | 
						|
	switch r.Kind {
 | 
						|
	case "service":
 | 
						|
		req.Body(r.Value.(*Service))
 | 
						|
	case "deployment":
 | 
						|
		req.Body(r.Value.(*Deployment))
 | 
						|
	case "pod":
 | 
						|
		req.Body(r.Value.(*Pod))
 | 
						|
	case "networkpolicy", "networkpolicies":
 | 
						|
		req.Body(r.Value.(*NetworkPolicy))
 | 
						|
	default:
 | 
						|
		return errors.New("unsupported resource")
 | 
						|
	}
 | 
						|
 | 
						|
	return req.Do().Error()
 | 
						|
}
 | 
						|
 | 
						|
// Delete removes API object
 | 
						|
func (c *client) Delete(r *Resource, opts ...DeleteOption) error {
 | 
						|
	options := DeleteOptions{
 | 
						|
		Namespace: c.opts.Namespace,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	return api.NewRequest(c.opts).
 | 
						|
		Delete().
 | 
						|
		Resource(r.Kind).
 | 
						|
		Name(r.Name).
 | 
						|
		Namespace(options.Namespace).
 | 
						|
		Do().
 | 
						|
		Error()
 | 
						|
}
 | 
						|
 | 
						|
// List lists API objects and stores the result in r
 | 
						|
func (c *client) List(r *Resource, opts ...ListOption) error {
 | 
						|
	options := ListOptions{
 | 
						|
		Namespace: c.opts.Namespace,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	return c.Get(r, GetNamespace(options.Namespace))
 | 
						|
}
 | 
						|
 | 
						|
// Watch returns an event stream
 | 
						|
func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) {
 | 
						|
	options := WatchOptions{
 | 
						|
		Namespace: c.opts.Namespace,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	// set the watch param
 | 
						|
	params := &api.Params{Additional: map[string]string{
 | 
						|
		"watch": "true",
 | 
						|
	}}
 | 
						|
 | 
						|
	// get options params
 | 
						|
	if options.Params != nil {
 | 
						|
		for k, v := range options.Params {
 | 
						|
			params.Additional[k] = v
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	req := api.NewRequest(c.opts).
 | 
						|
		Get().
 | 
						|
		Resource(r.Kind).
 | 
						|
		Name(r.Name).
 | 
						|
		Namespace(options.Namespace).
 | 
						|
		Params(params)
 | 
						|
 | 
						|
	return newWatcher(req)
 | 
						|
}
 | 
						|
 | 
						|
// NewService returns default micro kubernetes service definition
 | 
						|
func NewService(s *runtime.Service, opts *runtime.CreateOptions) *Resource {
 | 
						|
	labels := map[string]string{
 | 
						|
		"name":    Format(s.Name),
 | 
						|
		"version": Format(s.Version),
 | 
						|
		"micro":   Format(opts.Type),
 | 
						|
	}
 | 
						|
 | 
						|
	metadata := &Metadata{
 | 
						|
		Name:      Format(s.Name),
 | 
						|
		Namespace: Format(opts.Namespace),
 | 
						|
		Version:   Format(s.Version),
 | 
						|
		Labels:    labels,
 | 
						|
	}
 | 
						|
 | 
						|
	port := DefaultPort
 | 
						|
	if len(opts.Port) > 0 {
 | 
						|
		port, _ = strconv.Atoi(opts.Port)
 | 
						|
	}
 | 
						|
 | 
						|
	return &Resource{
 | 
						|
		Kind: "service",
 | 
						|
		Name: metadata.Name,
 | 
						|
		Value: &Service{
 | 
						|
			Metadata: metadata,
 | 
						|
			Spec: &ServiceSpec{
 | 
						|
				Type:     "ClusterIP",
 | 
						|
				Selector: labels,
 | 
						|
				Ports: []ServicePort{{
 | 
						|
					"service-port", port, "",
 | 
						|
				}},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// NewDeployment returns default micro kubernetes deployment definition
 | 
						|
func NewDeployment(s *runtime.Service, opts *runtime.CreateOptions) *Resource {
 | 
						|
	labels := map[string]string{
 | 
						|
		"name":    Format(s.Name),
 | 
						|
		"version": Format(s.Version),
 | 
						|
		"micro":   Format(opts.Type),
 | 
						|
	}
 | 
						|
 | 
						|
	// attach our values to the deployment; name, version, source
 | 
						|
	annotations := map[string]string{
 | 
						|
		"name":    s.Name,
 | 
						|
		"version": s.Version,
 | 
						|
		"source":  s.Source,
 | 
						|
	}
 | 
						|
	for k, v := range s.Metadata {
 | 
						|
		annotations[k] = v
 | 
						|
	}
 | 
						|
 | 
						|
	// construct the metadata for the deployment
 | 
						|
	metadata := &Metadata{
 | 
						|
		Name:        fmt.Sprintf("%v-%v", Format(s.Name), Format(s.Version)),
 | 
						|
		Namespace:   Format(opts.Namespace),
 | 
						|
		Version:     Format(s.Version),
 | 
						|
		Labels:      labels,
 | 
						|
		Annotations: annotations,
 | 
						|
	}
 | 
						|
 | 
						|
	// set the image
 | 
						|
	image := opts.Image
 | 
						|
	if len(image) == 0 {
 | 
						|
		image = DefaultImage
 | 
						|
	}
 | 
						|
 | 
						|
	// pass the env vars
 | 
						|
	env := make([]EnvVar, 0, len(opts.Env))
 | 
						|
	for _, evar := range opts.Env {
 | 
						|
		if comps := strings.Split(evar, "="); len(comps) == 2 {
 | 
						|
			env = append(env, EnvVar{Name: comps[0], Value: comps[1]})
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// pass the secrets
 | 
						|
	for key := range opts.Secrets {
 | 
						|
		env = append(env, EnvVar{
 | 
						|
			Name: key,
 | 
						|
			ValueFrom: &EnvVarSource{
 | 
						|
				SecretKeyRef: &SecretKeySelector{
 | 
						|
					Name: metadata.Name,
 | 
						|
					Key:  key,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	// parse resource limits
 | 
						|
	var resReqs *ResourceRequirements
 | 
						|
	if opts.Resources != nil {
 | 
						|
		resReqs = &ResourceRequirements{Limits: &ResourceLimits{}}
 | 
						|
 | 
						|
		if opts.Resources.CPU > 0 {
 | 
						|
			resReqs.Limits.CPU = fmt.Sprintf("%vm", opts.Resources.CPU)
 | 
						|
		}
 | 
						|
		if opts.Resources.Mem > 0 {
 | 
						|
			resReqs.Limits.Memory = fmt.Sprintf("%vMi", opts.Resources.Mem)
 | 
						|
		}
 | 
						|
		if opts.Resources.Disk > 0 {
 | 
						|
			resReqs.Limits.EphemeralStorage = fmt.Sprintf("%vMi", opts.Resources.Disk)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// parse the port option
 | 
						|
	port := DefaultPort
 | 
						|
	if len(opts.Port) > 0 {
 | 
						|
		port, _ = strconv.Atoi(opts.Port)
 | 
						|
	}
 | 
						|
 | 
						|
	return &Resource{
 | 
						|
		Kind: "deployment",
 | 
						|
		Name: metadata.Name,
 | 
						|
		Value: &Deployment{
 | 
						|
			Metadata: metadata,
 | 
						|
			Spec: &DeploymentSpec{
 | 
						|
				Replicas: 1,
 | 
						|
				Selector: &LabelSelector{
 | 
						|
					MatchLabels: labels,
 | 
						|
				},
 | 
						|
				Template: &Template{
 | 
						|
					Metadata: metadata,
 | 
						|
					PodSpec: &PodSpec{
 | 
						|
						ServiceAccountName: opts.ServiceAccount,
 | 
						|
						Containers: []Container{{
 | 
						|
							Name:    Format(s.Name),
 | 
						|
							Image:   image,
 | 
						|
							Env:     env,
 | 
						|
							Command: opts.Command,
 | 
						|
							Args:    opts.Args,
 | 
						|
							Ports: []ContainerPort{{
 | 
						|
								Name:          "service-port",
 | 
						|
								ContainerPort: port,
 | 
						|
							}},
 | 
						|
							ReadinessProbe: &Probe{
 | 
						|
								TCPSocket: &TCPSocketAction{
 | 
						|
									Port: port,
 | 
						|
								},
 | 
						|
								PeriodSeconds:       10,
 | 
						|
								InitialDelaySeconds: 10,
 | 
						|
							},
 | 
						|
							Resources: resReqs,
 | 
						|
						}},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// NewLocalClient returns a client that can be used with `kubectl proxy`
 | 
						|
func NewLocalClient(hosts ...string) *client {
 | 
						|
	if len(hosts) == 0 {
 | 
						|
		hosts[0] = "http://localhost:8001"
 | 
						|
	}
 | 
						|
	return &client{
 | 
						|
		opts: &api.Options{
 | 
						|
			Client:    http.DefaultClient,
 | 
						|
			Host:      hosts[0],
 | 
						|
			Namespace: "default",
 | 
						|
		},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// NewClusterClient creates a Kubernetes client for use from within a k8s pod.
 | 
						|
func NewClusterClient() *client {
 | 
						|
	host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT")
 | 
						|
 | 
						|
	s, err := os.Stat(serviceAccountPath)
 | 
						|
	if err != nil {
 | 
						|
		logger.Fatal(err)
 | 
						|
	}
 | 
						|
	if s == nil || !s.IsDir() {
 | 
						|
		logger.Fatal(errors.New("service account not found"))
 | 
						|
	}
 | 
						|
 | 
						|
	token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token"))
 | 
						|
	if err != nil {
 | 
						|
		logger.Fatal(err)
 | 
						|
	}
 | 
						|
	t := string(token)
 | 
						|
 | 
						|
	crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt"))
 | 
						|
	if err != nil {
 | 
						|
		logger.Fatal(err)
 | 
						|
	}
 | 
						|
 | 
						|
	c := &http.Client{
 | 
						|
		Transport: &http.Transport{
 | 
						|
			TLSClientConfig: &tls.Config{
 | 
						|
				RootCAs: crt,
 | 
						|
			},
 | 
						|
			DisableCompression: true,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	return &client{
 | 
						|
		opts: &api.Options{
 | 
						|
			Client:      c,
 | 
						|
			Host:        host,
 | 
						|
			BearerToken: &t,
 | 
						|
			Namespace:   DefaultNamespace,
 | 
						|
		},
 | 
						|
	}
 | 
						|
}
 |