Merge pull request #1049 from micro/kubernetes-logging

Kubernetes logging
This commit is contained in:
Jake Sanders 2019-12-17 17:44:24 +00:00 committed by GitHub
commit 812fe9e640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 178 additions and 6 deletions

View File

@ -0,0 +1,27 @@
// Package kubernetes is a logger implementing (github.com/micro/go-micro/debug/log).Log
package kubernetes
import (
"errors"
"github.com/micro/go-micro/debug/log"
)
type klog struct{}
func (k *klog) Read(...log.ReadOption) ([]log.Record, error) {
return nil, errors.New("not implemented")
}
func (k *klog) Write(l log.Record) error {
return write(l)
}
func (k *klog) Stream() (log.Stream, error) {
return &klogStreamer{}, nil
}
// New returns a configured Kubernetes logger
func New() log.Log {
return &klog{}
}

View File

@ -0,0 +1,56 @@
package kubernetes
import (
"bytes"
"encoding/json"
"io"
"os"
"testing"
"time"
"github.com/micro/go-micro/debug/log"
"github.com/stretchr/testify/assert"
)
func TestKubernetes(t *testing.T) {
k := New()
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
s := os.Stderr
os.Stderr = w
meta := make(map[string]string)
write := log.Record{
Timestamp: time.Unix(0, 0).UTC(),
Value: "Test log entry",
Metadata: meta,
}
meta["foo"] = "bar"
k.Write(write)
b := &bytes.Buffer{}
w.Close()
io.Copy(b, r)
os.Stderr = s
var read log.Record
if err := json.Unmarshal(b.Bytes(), &read); err != nil {
t.Fatalf("json.Unmarshal failed: %s", err.Error())
}
assert.Equal(t, write, read, "Write was not equal")
_, err = k.Read()
assert.Error(t, err, "Read should be unimplemented")
stream, err := k.Stream()
if err != nil {
t.Error(err)
}
records := []log.Record{}
go stream.Stop()
for s := range stream.Chan() {
records = append(records, s)
}
assert.Equal(t, 0, len(records), "Stream should return nothing")
}

View File

@ -0,0 +1,34 @@
package kubernetes
import "github.com/micro/go-micro/debug/log"
import (
"encoding/json"
"fmt"
"os"
)
func write(l log.Record) error {
m, err := json.Marshal(l)
if err == nil {
_, err := fmt.Fprintf(os.Stderr, "%s", m)
return err
}
return err
}
type klogStreamer struct {
streamChan chan log.Record
}
func (k *klogStreamer) Chan() <-chan log.Record {
if k.streamChan == nil {
k.streamChan = make(chan log.Record)
}
return k.streamChan
}
func (k *klogStreamer) Stop() error {
close(k.streamChan)
return nil
}

View File

@ -27,11 +27,11 @@ type Log interface {
// Record is log record entry
type Record struct {
// Timestamp of logged event
Timestamp time.Time
Timestamp time.Time `json:"time"`
// Value contains log entry
Value interface{}
Value interface{} `json:"value"`
// Metadata to enrich log record
Metadata map[string]string
Metadata map[string]string `json:"metadata"`
}
type Stream interface {

View File

@ -7,7 +7,7 @@ import (
"time"
"github.com/micro/go-micro/runtime"
"github.com/micro/go-micro/runtime/kubernetes/client"
"github.com/micro/go-micro/util/kubernetes/client"
"github.com/micro/go-micro/util/log"
)

View File

@ -5,7 +5,7 @@ import (
"time"
"github.com/micro/go-micro/runtime"
"github.com/micro/go-micro/runtime/kubernetes/client"
"github.com/micro/go-micro/util/kubernetes/client"
"github.com/micro/go-micro/util/log"
)

View File

@ -111,6 +111,17 @@ var tests = []testcase{
URI: "/apis/apps/v1/namespaces/default/deployments/baz",
Header: map[string]string{"foo": "bar"},
},
testcase{
ReqFn: func(opts *Options) *Request {
return NewRequest(opts).
Get().
Resource("pod").
SubResource("log").
Name("foolog")
},
Method: "GET",
URI: "/api/v1/namespaces/default/pods/foolog/log",
},
}
var wrappedHandler = func(test *testcase, t *testing.T) http.HandlerFunc {

View File

@ -23,6 +23,7 @@ type Request struct {
resource string
resourceName *string
subResource *string
body io.Reader
err error
@ -79,6 +80,13 @@ func (r *Request) Resource(s string) *Request {
return r
}
// SubResource sets a subresource on a resource,
// e.g. pods/log for pod logs
func (r *Request) SubResource(s string) *Request {
r.subResource = &s
return r
}
// Name is for targeting a specific resource by id
func (r *Request) Name(s string) *Request {
r.resourceName = &s
@ -154,6 +162,9 @@ func (r *Request) request() (*http.Request, error) {
// append resourceName if it is present
if r.resourceName != nil {
url += *r.resourceName
if r.subResource != nil {
url += "/" + *r.subResource
}
}
// append any query params
@ -202,6 +213,20 @@ func (r *Request) Do() *Response {
return newResponse(res, err)
}
// Raw performs a Raw HTTP request to the Kubernetes API
func (r *Request) Raw() (*http.Response, error) {
req, err := r.request()
if err != nil {
return nil, err
}
res, err := r.client.Do(req)
if err != nil {
return nil, err
}
return res, nil
}
// Options ...
type Options struct {
Host string

View File

@ -4,12 +4,13 @@ import (
"bytes"
"crypto/tls"
"errors"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"github.com/micro/go-micro/runtime/kubernetes/client/api"
"github.com/micro/go-micro/util/kubernetes/client/api"
"github.com/micro/go-micro/util/log"
)
@ -116,6 +117,21 @@ func (c *client) Get(r *Resource, labels map[string]string) error {
Into(r.Value)
}
// Logs returns logs for a pod
func (c *client) Logs(podName string) (io.ReadCloser, error) {
req := api.NewRequest(c.opts).
Get().
Resource("pod").
SubResource("log").
Name(podName)
resp, err := req.Raw()
if err != nil {
return nil, err
}
return resp.Body, nil
}
// Update updates API object
func (c *client) Update(r *Resource) error {
req := api.NewRequest(c.opts).

View File

@ -2,6 +2,7 @@
package client
import (
"io"
"strings"
"github.com/micro/go-micro/util/log"
@ -24,6 +25,8 @@ type Kubernetes interface {
Delete(*Resource) error
// List lists API resources
List(*Resource) error
// Logs gets logs from a pod
Logs(string) (io.ReadCloser, error)
}
// NewService returns default micro kubernetes service definition