692b27578c
* Add context option to runtime; Add dynamic namespace to kubectl client * Add namespace runtime arg * Fixes & Debugging * Pass options in k8s runtime * Set namespace on k8s resources * Additional Logging * More debugging * Remove Debugging * Ensure namespace exists * Add debugging * Refactor namespaceExists check * Fix * Fix * Fix * Fix * Change the way we check for namespace * Fix * Tidying Up * Fix Test * Fix merge bugs * Serialize k8s namespaces * Add namespace to watch * Serialize namespace when creating k8s namespace Co-authored-by: Ben Toogood <ben@micro.mu> Co-authored-by: Asim Aslam <asim@aslam.me>
372 lines
8.1 KiB
Go
372 lines
8.1 KiB
Go
// Package client provides an implementation of a restricted subset of kubernetes API client
|
|
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"errors"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/micro/go-micro/v2/logger"
|
|
"github.com/micro/go-micro/v2/util/kubernetes/api"
|
|
)
|
|
|
|
var (
|
|
// path to kubernetes service account token
|
|
serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
|
|
// ErrReadNamespace is returned when the names could not be read from service account
|
|
ErrReadNamespace = errors.New("Could not read namespace from service account secret")
|
|
// DefaultImage is default micro image
|
|
DefaultImage = "micro/go-micro"
|
|
// DefaultNamespace is the default k8s namespace
|
|
DefaultNamespace = "default"
|
|
)
|
|
|
|
// Client ...
|
|
type client struct {
|
|
opts *api.Options
|
|
}
|
|
|
|
// Kubernetes client
|
|
type Client interface {
|
|
// Create creates new API resource
|
|
Create(*Resource, ...CreateOption) error
|
|
// Get queries API resrouces
|
|
Get(*Resource, ...GetOption) error
|
|
// Update patches existing API object
|
|
Update(*Resource, ...UpdateOption) error
|
|
// Delete deletes API resource
|
|
Delete(*Resource, ...DeleteOption) error
|
|
// List lists API resources
|
|
List(*Resource, ...ListOption) error
|
|
// Log gets log for a pod
|
|
Log(*Resource, ...LogOption) (io.ReadCloser, error)
|
|
// Watch for events
|
|
Watch(*Resource, ...WatchOption) (Watcher, error)
|
|
}
|
|
|
|
// Create creates new API object
|
|
func (c *client) Create(r *Resource, opts ...CreateOption) error {
|
|
var options CreateOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
b := new(bytes.Buffer)
|
|
if err := renderTemplate(r.Kind, b, r.Value); err != nil {
|
|
return err
|
|
}
|
|
|
|
return api.NewRequest(c.opts).
|
|
Post().
|
|
SetHeader("Content-Type", "application/yaml").
|
|
Namespace(options.Namespace).
|
|
Resource(r.Kind).
|
|
Body(b).
|
|
Do().
|
|
Error()
|
|
}
|
|
|
|
var (
|
|
nameRegex = regexp.MustCompile("[^a-zA-Z0-9]+")
|
|
)
|
|
|
|
// SerializeResourceName removes all spacial chars from a string so it
|
|
// can be used as a k8s resource name
|
|
func SerializeResourceName(ns string) string {
|
|
return nameRegex.ReplaceAllString(ns, "-")
|
|
}
|
|
|
|
// Get queries API objects and stores the result in r
|
|
func (c *client) Get(r *Resource, opts ...GetOption) error {
|
|
var options GetOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
return api.NewRequest(c.opts).
|
|
Get().
|
|
Resource(r.Kind).
|
|
Namespace(options.Namespace).
|
|
Params(&api.Params{LabelSelector: options.Labels}).
|
|
Do().
|
|
Into(r.Value)
|
|
}
|
|
|
|
// Log returns logs for a pod
|
|
func (c *client) Log(r *Resource, opts ...LogOption) (io.ReadCloser, error) {
|
|
var options LogOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
req := api.NewRequest(c.opts).
|
|
Get().
|
|
Resource(r.Kind).
|
|
SubResource("log").
|
|
Name(r.Name).
|
|
Namespace(options.Namespace)
|
|
|
|
if options.Params != nil {
|
|
req.Params(&api.Params{Additional: options.Params})
|
|
}
|
|
|
|
resp, err := req.Raw()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
resp.Body.Close()
|
|
return nil, errors.New(resp.Request.URL.String() + ": " + resp.Status)
|
|
}
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// Update updates API object
|
|
func (c *client) Update(r *Resource, opts ...UpdateOption) error {
|
|
var options UpdateOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
req := api.NewRequest(c.opts).
|
|
Patch().
|
|
SetHeader("Content-Type", "application/strategic-merge-patch+json").
|
|
Resource(r.Kind).
|
|
Name(r.Name).
|
|
Namespace(options.Namespace)
|
|
|
|
switch r.Kind {
|
|
case "service":
|
|
req.Body(r.Value.(*Service))
|
|
case "deployment":
|
|
req.Body(r.Value.(*Deployment))
|
|
case "pod":
|
|
req.Body(r.Value.(*Pod))
|
|
default:
|
|
return errors.New("unsupported resource")
|
|
}
|
|
|
|
return req.Do().Error()
|
|
}
|
|
|
|
// Delete removes API object
|
|
func (c *client) Delete(r *Resource, opts ...DeleteOption) error {
|
|
var options DeleteOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
return api.NewRequest(c.opts).
|
|
Delete().
|
|
Resource(r.Kind).
|
|
Name(r.Name).
|
|
Namespace(options.Namespace).
|
|
Do().
|
|
Error()
|
|
}
|
|
|
|
// List lists API objects and stores the result in r
|
|
func (c *client) List(r *Resource, opts ...ListOption) error {
|
|
var options ListOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
labels := map[string]string{
|
|
"micro": "service",
|
|
}
|
|
|
|
return c.Get(r, GetLabels(labels), GetNamespace(options.Namespace))
|
|
}
|
|
|
|
// Watch returns an event stream
|
|
func (c *client) Watch(r *Resource, opts ...WatchOption) (Watcher, error) {
|
|
var options WatchOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
// set the watch param
|
|
params := &api.Params{Additional: map[string]string{
|
|
"watch": "true",
|
|
}}
|
|
|
|
// get options params
|
|
if options.Params != nil {
|
|
for k, v := range options.Params {
|
|
params.Additional[k] = v
|
|
}
|
|
}
|
|
|
|
req := api.NewRequest(c.opts).
|
|
Get().
|
|
Resource(r.Kind).
|
|
Name(r.Name).
|
|
Namespace(options.Namespace).
|
|
Params(params)
|
|
|
|
return newWatcher(req)
|
|
}
|
|
|
|
// NewService returns default micro kubernetes service definition
|
|
func NewService(name, version, typ, namespace string) *Service {
|
|
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
|
logger.Tracef("kubernetes default service: name: %s, version: %s", name, version)
|
|
}
|
|
|
|
Labels := map[string]string{
|
|
"name": name,
|
|
"version": version,
|
|
"micro": typ,
|
|
}
|
|
|
|
svcName := name
|
|
if len(version) > 0 {
|
|
// API service object name joins name and version over "-"
|
|
svcName = strings.Join([]string{name, version}, "-")
|
|
}
|
|
|
|
Metadata := &Metadata{
|
|
Name: svcName,
|
|
Namespace: SerializeResourceName(namespace),
|
|
Version: version,
|
|
Labels: Labels,
|
|
}
|
|
|
|
Spec := &ServiceSpec{
|
|
Type: "ClusterIP",
|
|
Selector: Labels,
|
|
Ports: []ServicePort{{
|
|
"service-port", 8080, "",
|
|
}},
|
|
}
|
|
|
|
return &Service{
|
|
Metadata: Metadata,
|
|
Spec: Spec,
|
|
}
|
|
}
|
|
|
|
// NewService returns default micro kubernetes deployment definition
|
|
func NewDeployment(name, version, typ, namespace string) *Deployment {
|
|
if logger.V(logger.TraceLevel, logger.DefaultLogger) {
|
|
logger.Tracef("kubernetes default deployment: name: %s, version: %s", name, version)
|
|
}
|
|
|
|
Labels := map[string]string{
|
|
"name": name,
|
|
"version": version,
|
|
"micro": typ,
|
|
}
|
|
|
|
depName := name
|
|
if len(version) > 0 {
|
|
// API deployment object name joins name and version over "-"
|
|
depName = strings.Join([]string{name, version}, "-")
|
|
}
|
|
|
|
Metadata := &Metadata{
|
|
Name: depName,
|
|
Namespace: SerializeResourceName(namespace),
|
|
Version: version,
|
|
Labels: Labels,
|
|
Annotations: map[string]string{},
|
|
}
|
|
|
|
// enable go modules by default
|
|
env := EnvVar{
|
|
Name: "GO111MODULE",
|
|
Value: "on",
|
|
}
|
|
|
|
Spec := &DeploymentSpec{
|
|
Replicas: 1,
|
|
Selector: &LabelSelector{
|
|
MatchLabels: Labels,
|
|
},
|
|
Template: &Template{
|
|
Metadata: Metadata,
|
|
PodSpec: &PodSpec{
|
|
Containers: []Container{{
|
|
Name: name,
|
|
Image: DefaultImage,
|
|
Env: []EnvVar{env},
|
|
Command: []string{"go", "run", "."},
|
|
Ports: []ContainerPort{{
|
|
Name: "service-port",
|
|
ContainerPort: 8080,
|
|
}},
|
|
}},
|
|
},
|
|
},
|
|
}
|
|
|
|
return &Deployment{
|
|
Metadata: Metadata,
|
|
Spec: Spec,
|
|
}
|
|
}
|
|
|
|
// NewLocalClient returns a client that can be used with `kubectl proxy`
|
|
func NewLocalClient(hosts ...string) *client {
|
|
if len(hosts) == 0 {
|
|
hosts[0] = "http://localhost:8001"
|
|
}
|
|
return &client{
|
|
opts: &api.Options{
|
|
Client: http.DefaultClient,
|
|
Host: hosts[0],
|
|
Namespace: "default",
|
|
},
|
|
}
|
|
}
|
|
|
|
// NewClusterClient creates a Kubernetes client for use from within a k8s pod.
|
|
func NewClusterClient() *client {
|
|
host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT")
|
|
|
|
s, err := os.Stat(serviceAccountPath)
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
if s == nil || !s.IsDir() {
|
|
logger.Fatal(errors.New("service account not found"))
|
|
}
|
|
|
|
token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token"))
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
t := string(token)
|
|
|
|
crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt"))
|
|
if err != nil {
|
|
logger.Fatal(err)
|
|
}
|
|
|
|
c := &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{
|
|
RootCAs: crt,
|
|
},
|
|
DisableCompression: true,
|
|
},
|
|
}
|
|
|
|
return &client{
|
|
opts: &api.Options{
|
|
Client: c,
|
|
Host: host,
|
|
BearerToken: &t,
|
|
Namespace: DefaultNamespace,
|
|
},
|
|
}
|
|
}
|