Update the util/kubernetes client to retrieve logs

This commit is contained in:
Jake Sanders 2019-12-17 16:09:51 +00:00
parent 0415ead504
commit 53ca742c66
7 changed files with 85 additions and 9 deletions

View File

@ -9,7 +9,9 @@ type klog struct{}
func (k *klog) Read(...log.ReadOption) []log.Record { return nil } func (k *klog) Read(...log.ReadOption) []log.Record { return nil }
func (k *klog) Write(log.Record) {} func (k *klog) Write(l log.Record) {
write(l)
}
func (k *klog) Stream(stop chan bool) <-chan log.Record { func (k *klog) Stream(stop chan bool) <-chan log.Record {
c := make(chan log.Record) c := make(chan log.Record)

View File

@ -2,6 +2,7 @@ package kubernetes
import ( import (
"bytes" "bytes"
"encoding/json"
"io" "io"
"os" "os"
"testing" "testing"
@ -13,6 +14,7 @@ import (
func TestKubernetes(t *testing.T) { func TestKubernetes(t *testing.T) {
k := New() k := New()
r, w, err := os.Pipe() r, w, err := os.Pipe()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -20,17 +22,22 @@ func TestKubernetes(t *testing.T) {
s := os.Stderr s := os.Stderr
os.Stderr = w os.Stderr = w
meta := make(map[string]string) meta := make(map[string]string)
meta["foo"] = "bar" write := log.Record{
k.Write(log.Record{
Timestamp: time.Unix(0, 0), Timestamp: time.Unix(0, 0),
Value: "Test log entry", Value: "Test log entry",
Metadata: meta, Metadata: meta,
}) }
meta["foo"] = "bar"
k.Write(write)
b := &bytes.Buffer{} b := &bytes.Buffer{}
w.Close() w.Close()
io.Copy(b, r) io.Copy(b, r)
os.Stderr = s os.Stderr = s
assert.Equal(t, "Test log entry", b.String(), "Write was not equal") var read log.Record
if err := json.Unmarshal(b.Bytes(), &read); err != nil {
t.Fatalf("json.Unmarshal failed: %s", err.Error())
}
assert.Equal(t, write, read, "Write was not equal")
assert.Nil(t, k.Read(), "Read should be unimplemented") assert.Nil(t, k.Read(), "Read should be unimplemented")

View File

@ -0,0 +1,15 @@
package kubernetes
import "github.com/micro/go-micro/debug/log"
import (
"encoding/json"
"fmt"
"os"
)
func write(l log.Record) {
if m, err := json.Marshal(l); err == nil {
fmt.Fprintf(os.Stderr, "%s", m)
}
}

View File

@ -29,14 +29,14 @@ type Log interface {
// Record is log record entry // Record is log record entry
type Record struct { type Record struct {
// Timestamp of logged event // Timestamp of logged event
Timestamp time.Time Timestamp time.Time `json:"time"`
// Value contains log entry // Value contains log entry
Value interface{} Value interface{} `json:"value"`
// Metadata to enrich log record // Metadata to enrich log record
Metadata map[string]string Metadata map[string]string `json:"metadata"`
} }
// level is a log level // Level is a log level
type Level int type Level int
const ( const (

View File

@ -111,6 +111,17 @@ var tests = []testcase{
URI: "/apis/apps/v1/namespaces/default/deployments/baz", URI: "/apis/apps/v1/namespaces/default/deployments/baz",
Header: map[string]string{"foo": "bar"}, Header: map[string]string{"foo": "bar"},
}, },
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).
Get().
Resource("pod").
SubResource("log").
Name("foolog")
},
Method: "GET",
URI: "/api/v1/namespaces/default/pods/foolog/log",
},
} }
var wrappedHandler = func(test *testcase, t *testing.T) http.HandlerFunc { var wrappedHandler = func(test *testcase, t *testing.T) http.HandlerFunc {

View File

@ -23,6 +23,7 @@ type Request struct {
resource string resource string
resourceName *string resourceName *string
subResource *string
body io.Reader body io.Reader
err error err error
@ -79,6 +80,13 @@ func (r *Request) Resource(s string) *Request {
return r return r
} }
// SubResource sets a subresource on a resource,
// e.g. pods/log for pod logs
func (r *Request) SubResource(s string) *Request {
r.subResource = &s
return r
}
// Name is for targeting a specific resource by id // Name is for targeting a specific resource by id
func (r *Request) Name(s string) *Request { func (r *Request) Name(s string) *Request {
r.resourceName = &s r.resourceName = &s
@ -154,6 +162,9 @@ func (r *Request) request() (*http.Request, error) {
// append resourceName if it is present // append resourceName if it is present
if r.resourceName != nil { if r.resourceName != nil {
url += *r.resourceName url += *r.resourceName
if r.subResource != nil {
url += "/" + *r.subResource
}
} }
// append any query params // append any query params
@ -202,6 +213,20 @@ func (r *Request) Do() *Response {
return newResponse(res, err) return newResponse(res, err)
} }
// Raw performs a Raw HTTP request to the Kubernetes API
func (r *Request) Raw() (*http.Response, error) {
req, err := r.request()
if err != nil {
return nil, err
}
res, err := r.client.Do(req)
if err != nil {
return nil, err
}
return res, nil
}
// Options ... // Options ...
type Options struct { type Options struct {
Host string Host string

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"errors" "errors"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os" "os"
@ -116,6 +117,21 @@ func (c *client) Get(r *Resource, labels map[string]string) error {
Into(r.Value) Into(r.Value)
} }
// Logs returns logs for a pod
func (c *client) Logs(podName string) (io.ReadCloser, error) {
req := api.NewRequest(c.opts).
Get().
Resource("pod").
SubResource("log").
Name(podName)
resp, err := req.Raw()
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Update updates API object // Update updates API object
func (c *client) Update(r *Resource) error { func (c *client) Update(r *Resource) error {
req := api.NewRequest(c.opts). req := api.NewRequest(c.opts).