ce33e3b072
* wip * Implementation of Kubernetes Logger * Missing file * Skip test in Travis
205 lines
4.3 KiB
Go
205 lines
4.3 KiB
Go
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"errors"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
|
|
"github.com/micro/go-micro/util/kubernetes/client/api"
|
|
"github.com/micro/go-micro/util/log"
|
|
)
|
|
|
|
var (
|
|
// path to kubernetes service account token
|
|
serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
|
|
// ErrReadNamespace is returned when the names could not be read from service account
|
|
ErrReadNamespace = errors.New("Could not read namespace from service account secret")
|
|
)
|
|
|
|
// Client ...
|
|
type client struct {
|
|
opts *api.Options
|
|
}
|
|
|
|
// NewLocalDevClient returns a client that can be used with `kubectl proxy` on an optional port
|
|
func NewLocalDevClient(port ...int) *client {
|
|
var p int
|
|
if len(port) > 1 {
|
|
log.Fatal("Expected 0 or 1 port parameters")
|
|
}
|
|
if len(port) == 0 {
|
|
p = 8001
|
|
} else {
|
|
p = port[0]
|
|
}
|
|
return &client{
|
|
opts: &api.Options{
|
|
Client: http.DefaultClient,
|
|
Host: "http://localhost:" + strconv.Itoa(p),
|
|
Namespace: "default",
|
|
},
|
|
}
|
|
}
|
|
|
|
// NewClientInCluster creates a Kubernetes client for use from within a k8s pod.
|
|
func NewClientInCluster() *client {
|
|
host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT")
|
|
|
|
s, err := os.Stat(serviceAccountPath)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
if s == nil || !s.IsDir() {
|
|
log.Fatal(errors.New("service account not found"))
|
|
}
|
|
|
|
token, err := ioutil.ReadFile(path.Join(serviceAccountPath, "token"))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
t := string(token)
|
|
|
|
ns, err := detectNamespace()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
crt, err := CertPoolFromFile(path.Join(serviceAccountPath, "ca.crt"))
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
c := &http.Client{
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{
|
|
RootCAs: crt,
|
|
},
|
|
DisableCompression: true,
|
|
},
|
|
}
|
|
|
|
return &client{
|
|
opts: &api.Options{
|
|
Client: c,
|
|
Host: host,
|
|
Namespace: ns,
|
|
BearerToken: &t,
|
|
},
|
|
}
|
|
}
|
|
|
|
func detectNamespace() (string, error) {
|
|
nsPath := path.Join(serviceAccountPath, "namespace")
|
|
|
|
// Make sure it's a file and we can read it
|
|
if s, e := os.Stat(nsPath); e != nil {
|
|
return "", e
|
|
} else if s.IsDir() {
|
|
return "", ErrReadNamespace
|
|
}
|
|
|
|
// Read the file, and cast to a string
|
|
if ns, e := ioutil.ReadFile(nsPath); e != nil {
|
|
return string(ns), e
|
|
} else {
|
|
return string(ns), nil
|
|
}
|
|
}
|
|
|
|
// Create creates new API object
|
|
func (c *client) Create(r *Resource) error {
|
|
b := new(bytes.Buffer)
|
|
if err := renderTemplate(r.Kind, b, r.Value); err != nil {
|
|
return err
|
|
}
|
|
|
|
return api.NewRequest(c.opts).
|
|
Post().
|
|
SetHeader("Content-Type", "application/yaml").
|
|
Resource(r.Kind).
|
|
Body(b).
|
|
Do().
|
|
Error()
|
|
}
|
|
|
|
// Get queries API objects and stores the result in r
|
|
func (c *client) Get(r *Resource, labels map[string]string) error {
|
|
return api.NewRequest(c.opts).
|
|
Get().
|
|
Resource(r.Kind).
|
|
Params(&api.Params{LabelSelector: labels}).
|
|
Do().
|
|
Into(r.Value)
|
|
}
|
|
|
|
// Logs returns logs for a pod
|
|
func (c *client) Logs(podName string, options ...LogOption) (io.ReadCloser, error) {
|
|
opts := &LogOptions{}
|
|
for _, o := range options {
|
|
o(opts)
|
|
}
|
|
req := api.NewRequest(c.opts).
|
|
Get().
|
|
Resource("pod").
|
|
SubResource("log").
|
|
Name(podName)
|
|
|
|
if opts.AdditionalParams != nil {
|
|
req.Params(&api.Params{Additional: opts.AdditionalParams})
|
|
}
|
|
|
|
resp, err := req.Raw()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
resp.Body.Close()
|
|
return nil, errors.New(resp.Request.URL.String() + ": " + resp.Status)
|
|
}
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// Update updates API object
|
|
func (c *client) Update(r *Resource) error {
|
|
req := api.NewRequest(c.opts).
|
|
Patch().
|
|
SetHeader("Content-Type", "application/strategic-merge-patch+json").
|
|
Resource(r.Kind).
|
|
Name(r.Name)
|
|
|
|
switch r.Kind {
|
|
case "service":
|
|
req.Body(r.Value.(*Service))
|
|
case "deployment":
|
|
req.Body(r.Value.(*Deployment))
|
|
default:
|
|
return errors.New("unsupported resource")
|
|
}
|
|
|
|
return req.Do().Error()
|
|
}
|
|
|
|
// Delete removes API object
|
|
func (c *client) Delete(r *Resource) error {
|
|
return api.NewRequest(c.opts).
|
|
Delete().
|
|
Resource(r.Kind).
|
|
Name(r.Name).
|
|
Do().
|
|
Error()
|
|
}
|
|
|
|
// List lists API objects and stores the result in r
|
|
func (c *client) List(r *Resource) error {
|
|
labels := map[string]string{
|
|
"micro": "service",
|
|
}
|
|
return c.Get(r, labels)
|
|
}
|