From ce33e3b072bbd983de4662e366e269544a7f6248 Mon Sep 17 00:00:00 2001 From: Jake Sanders Date: Fri, 20 Dec 2019 23:16:05 +0000 Subject: [PATCH] Kubernetes logging (#1054) * wip * Implementation of Kubernetes Logger * Missing file * Skip test in Travis --- debug/log/kubernetes/kubernetes.go | 152 ++++++++++++++++++++++-- debug/log/kubernetes/kubernetes_test.go | 12 +- debug/log/kubernetes/util.go | 15 +++ util/kubernetes/client/api/request.go | 4 + util/kubernetes/client/client.go | 35 +++++- util/kubernetes/client/kubernetes.go | 2 +- util/kubernetes/client/logoptions.go | 13 ++ util/kubernetes/client/types.go | 6 +- 8 files changed, 224 insertions(+), 15 deletions(-) create mode 100644 debug/log/kubernetes/util.go create mode 100644 util/kubernetes/client/logoptions.go diff --git a/debug/log/kubernetes/kubernetes.go b/debug/log/kubernetes/kubernetes.go index 06ecf790..ff727a3b 100644 --- a/debug/log/kubernetes/kubernetes.go +++ b/debug/log/kubernetes/kubernetes.go @@ -2,15 +2,62 @@ package kubernetes import ( - "errors" + "bufio" + "encoding/json" + "fmt" + "os" + "sort" + "strconv" + "time" "github.com/micro/go-micro/debug/log" + "github.com/micro/go-micro/util/kubernetes/client" ) -type klog struct{} +type klog struct { + client client.Kubernetes -func (k *klog) Read(...log.ReadOption) ([]log.Record, error) { - return nil, errors.New("not implemented") + log.Options +} + +func (k *klog) Read(options ...log.ReadOption) ([]log.Record, error) { + opts := &log.ReadOptions{} + for _, o := range options { + o(opts) + } + + logsToGet, err := k.getMatchingPods() + if err != nil { + return nil, err + } + records := []log.Record{} + + for _, l := range logsToGet { + logParams := make(map[string]string) + if !opts.Since.Equal(time.Time{}) { + logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds())) + } + if opts.Count != 0 { + logParams["tailLines"] = strconv.Itoa(opts.Count) + } + if opts.Stream == true { + logParams["follow"] = "true" + } + logs, err := k.client.Logs(l, client.AdditionalParams(logParams)) + if err != nil { + return nil, err + } + defer logs.Close() + s := bufio.NewScanner(logs) + for s.Scan() { + record := k.parse(s.Text()) + record.Metadata["pod"] = l + records = append(records, record) + } + } + + sort.Sort(byTimestamp(records)) + return records, nil } func (k *klog) Write(l log.Record) error { @@ -18,13 +65,102 @@ func (k *klog) Write(l log.Record) error { } func (k *klog) Stream() (log.Stream, error) { - return &klogStreamer{ + return k.stream() +} + +func (k *klog) stream() (log.Stream, error) { + pods, err := k.getMatchingPods() + if err != nil { + return nil, err + } + logStreamer := &klogStreamer{ streamChan: make(chan log.Record), stop: make(chan bool), - }, nil + } + errorChan := make(chan error) + go func(stopChan <-chan bool) { + for { + select { + case <-stopChan: + return + case err := <-errorChan: + fmt.Fprintf(os.Stderr, err.Error()) + } + } + }(logStreamer.stop) + for _, pod := range pods { + go k.individualPodLogStreamer(pod, logStreamer.streamChan, errorChan, logStreamer.stop) + } + return logStreamer, nil +} + +func (k *klog) individualPodLogStreamer(podName string, recordChan chan<- log.Record, errorChan chan<- error, stopChan <-chan bool) { + p := make(map[string]string) + p["follow"] = "true" + body, err := k.client.Logs(podName, client.AdditionalParams(p)) + if err != nil { + errorChan <- err + return + } + s := bufio.NewScanner(body) + defer body.Close() + for { + select { + case <-stopChan: + return + default: + if s.Scan() { + record := k.parse(s.Text()) + recordChan <- record + } else { + time.Sleep(time.Second) + } + } + } } // New returns a configured Kubernetes logger -func New() log.Log { - return &klog{} +func New(opts ...log.Option) log.Log { + klog := &klog{} + for _, o := range opts { + o(&klog.Options) + } + + if len(os.Getenv("KUBERNETES_SERVICE_HOST")) > 0 { + klog.client = client.NewClientInCluster() + } else { + klog.client = client.NewLocalDevClient() + } + return klog +} + +func (k *klog) getMatchingPods() ([]string, error) { + r := &client.Resource{ + Kind: "pod", + Value: new(client.PodList), + } + l := make(map[string]string) + l["micro"] = "runtime" + if err := k.client.Get(r, l); err != nil { + return nil, err + } + + var matches []string + for _, p := range r.Value.(*client.PodList).Items { + if p.Metadata.Labels["name"] == k.Options.Name { + matches = append(matches, p.Metadata.Name) + } + } + return matches, nil +} + +func (k *klog) parse(line string) log.Record { + record := log.Record{} + if err := json.Unmarshal([]byte(line), &record); err != nil { + record.Timestamp = time.Now().UTC() + record.Message = line + record.Metadata = make(map[string]string) + } + record.Metadata["service"] = k.Options.Name + return record } diff --git a/debug/log/kubernetes/kubernetes_test.go b/debug/log/kubernetes/kubernetes_test.go index 27c3a6d9..aabe97f3 100644 --- a/debug/log/kubernetes/kubernetes_test.go +++ b/debug/log/kubernetes/kubernetes_test.go @@ -13,7 +13,10 @@ import ( ) func TestKubernetes(t *testing.T) { - k := New() + if os.Getenv("IN_TRAVIS_CI") == "yes" { + t.Skip("In Travis CI") + } + k := New(log.Name("micro-network")) r, w, err := os.Pipe() if err != nil { @@ -39,14 +42,15 @@ func TestKubernetes(t *testing.T) { } assert.Equal(t, write, read, "Write was not equal") - _, err = k.Read() - assert.Error(t, err, "Read should be unimplemented") + records, err := k.Read() + assert.Nil(t, err, "Read should not error") + assert.NotNil(t, records, "Read should return records") stream, err := k.Stream() if err != nil { t.Error(err) } - records := []log.Record{} + records = []log.Record{} go stream.Stop() for s := range stream.Chan() { records = append(records, s) diff --git a/debug/log/kubernetes/util.go b/debug/log/kubernetes/util.go new file mode 100644 index 00000000..9b259e6e --- /dev/null +++ b/debug/log/kubernetes/util.go @@ -0,0 +1,15 @@ +package kubernetes + +import "github.com/micro/go-micro/debug/log" + +// ByTimestamp lets you sort log records by Timestamp (implements Sort.Sort) +type byTimestamp []log.Record + +// Len returns the number of Log records (implements Sort.Sort) +func (b byTimestamp) Len() int { return len(b) } + +// Swap swaps 2 Log records (implements Sort.Sort) +func (b byTimestamp) Swap(i, j int) { b[i], b[j] = b[j], b[i] } + +// Less checks if a record was before another record (implements Sort.Sort) +func (b byTimestamp) Less(i, j int) bool { return b[i].Timestamp.Before(b[j].Timestamp) } diff --git a/util/kubernetes/client/api/request.go b/util/kubernetes/client/api/request.go index 53d57195..de314dca 100644 --- a/util/kubernetes/client/api/request.go +++ b/util/kubernetes/client/api/request.go @@ -34,6 +34,7 @@ type Request struct { type Params struct { LabelSelector map[string]string Annotations map[string]string + Additional map[string]string } // verb sets method @@ -136,6 +137,9 @@ func (r *Request) Params(p *Params) *Request { // set and overwrite the value r.params.Set("labelSelector", value) } + for k, v := range p.Additional { + r.params.Set(k, v) + } return r } diff --git a/util/kubernetes/client/client.go b/util/kubernetes/client/client.go index 821ee3da..ff82f837 100644 --- a/util/kubernetes/client/client.go +++ b/util/kubernetes/client/client.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path" + "strconv" "github.com/micro/go-micro/util/kubernetes/client/api" "github.com/micro/go-micro/util/log" @@ -26,6 +27,26 @@ type client struct { opts *api.Options } +// NewLocalDevClient returns a client that can be used with `kubectl proxy` on an optional port +func NewLocalDevClient(port ...int) *client { + var p int + if len(port) > 1 { + log.Fatal("Expected 0 or 1 port parameters") + } + if len(port) == 0 { + p = 8001 + } else { + p = port[0] + } + return &client{ + opts: &api.Options{ + Client: http.DefaultClient, + Host: "http://localhost:" + strconv.Itoa(p), + Namespace: "default", + }, + } +} + // NewClientInCluster creates a Kubernetes client for use from within a k8s pod. func NewClientInCluster() *client { host := "https://" + os.Getenv("KUBERNETES_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_SERVICE_PORT") @@ -118,17 +139,29 @@ func (c *client) Get(r *Resource, labels map[string]string) error { } // Logs returns logs for a pod -func (c *client) Logs(podName string) (io.ReadCloser, error) { +func (c *client) Logs(podName string, options ...LogOption) (io.ReadCloser, error) { + opts := &LogOptions{} + for _, o := range options { + o(opts) + } req := api.NewRequest(c.opts). Get(). Resource("pod"). SubResource("log"). Name(podName) + if opts.AdditionalParams != nil { + req.Params(&api.Params{Additional: opts.AdditionalParams}) + } + resp, err := req.Raw() if err != nil { return nil, err } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + resp.Body.Close() + return nil, errors.New(resp.Request.URL.String() + ": " + resp.Status) + } return resp.Body, nil } diff --git a/util/kubernetes/client/kubernetes.go b/util/kubernetes/client/kubernetes.go index 8e30c923..761602ac 100644 --- a/util/kubernetes/client/kubernetes.go +++ b/util/kubernetes/client/kubernetes.go @@ -26,7 +26,7 @@ type Kubernetes interface { // List lists API resources List(*Resource) error // Logs gets logs from a pod - Logs(string) (io.ReadCloser, error) + Logs(string, ...LogOption) (io.ReadCloser, error) } // NewService returns default micro kubernetes service definition diff --git a/util/kubernetes/client/logoptions.go b/util/kubernetes/client/logoptions.go new file mode 100644 index 00000000..349d0c59 --- /dev/null +++ b/util/kubernetes/client/logoptions.go @@ -0,0 +1,13 @@ +package client + +type LogOptions struct { + AdditionalParams map[string]string +} + +type LogOption func(*LogOptions) + +func AdditionalParams(p map[string]string) LogOption { + return func(l *LogOptions) { + l.AdditionalParams = p + } +} diff --git a/util/kubernetes/client/types.go b/util/kubernetes/client/types.go index b9bfa4a3..588f9e40 100644 --- a/util/kubernetes/client/types.go +++ b/util/kubernetes/client/types.go @@ -79,7 +79,7 @@ type Container struct { Ports []ContainerPort `json:"ports,omitempty"` } -// PodSpec +// PodSpec is a pod type PodSpec struct { Containers []Container `json:"containers"` } @@ -131,3 +131,7 @@ type Deployment struct { type DeploymentList struct { Items []Deployment `json:"items"` } + +type PodList struct { + Items []Template `json:"items"` +}