micro/debug/log/kubernetes/kubernetes.go

191 lines
3.6 KiB
Go
Raw Normal View History

// Package kubernetes is a logger implementing (github.com/micro/go-micro/v2/debug/log).Log
2019-12-17 12:11:26 +00:00
package kubernetes
import (
"bufio"
"encoding/json"
"fmt"
"os"
"sort"
"strconv"
"time"
2019-12-17 17:24:01 +00:00
"github.com/micro/go-micro/v2/debug/log"
"github.com/micro/go-micro/v2/util/kubernetes/client"
2019-12-17 12:11:26 +00:00
)
type klog struct {
2019-12-24 17:51:30 +00:00
client client.Client
2019-12-17 12:11:26 +00:00
log.Options
}
2019-12-20 23:34:08 +00:00
func (k *klog) podLogStream(podName string, stream *kubeStream) {
p := make(map[string]string)
p["follow"] = "true"
2019-12-24 17:33:05 +00:00
// get the logs for the pod
body, err := k.client.Log(&client.Resource{
Name: podName,
Kind: "pod",
}, client.LogParams(p))
2019-12-20 23:34:08 +00:00
if err != nil {
fmt.Fprintf(os.Stderr, err.Error())
return
}
s := bufio.NewScanner(body)
defer body.Close()
for {
select {
case <-stream.stop:
return
default:
if s.Scan() {
record := k.parse(s.Text())
stream.stream <- record
} else {
// TODO: is there a blocking call
// rather than a sleep loop?
time.Sleep(time.Second)
}
}
}
}
func (k *klog) getMatchingPods() ([]string, error) {
r := &client.Resource{
Kind: "pod",
Value: new(client.PodList),
}
l := make(map[string]string)
l["name"] = client.Format(k.Options.Name)
// TODO: specify micro:service
// l["micro"] = "service"
if err := k.client.Get(r, l); err != nil {
return nil, err
}
var matches []string
for _, p := range r.Value.(*client.PodList).Items {
// find labels that match the name
if p.Metadata.Labels["name"] == client.Format(k.Options.Name) {
matches = append(matches, p.Metadata.Name)
}
}
return matches, nil
}
func (k *klog) parse(line string) log.Record {
record := log.Record{}
if err := json.Unmarshal([]byte(line), &record); err != nil {
record.Timestamp = time.Now().UTC()
record.Message = line
record.Metadata = make(map[string]string)
}
record.Metadata["service"] = k.Options.Name
return record
}
func (k *klog) Read(options ...log.ReadOption) ([]log.Record, error) {
opts := &log.ReadOptions{}
for _, o := range options {
o(opts)
}
2019-12-24 17:33:05 +00:00
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
2019-12-20 23:34:08 +00:00
var records []log.Record
2019-12-24 17:33:05 +00:00
for _, pod := range pods {
logParams := make(map[string]string)
2019-12-20 23:34:08 +00:00
if !opts.Since.Equal(time.Time{}) {
logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds()))
}
2019-12-20 23:34:08 +00:00
if opts.Count != 0 {
logParams["tailLines"] = strconv.Itoa(opts.Count)
}
2019-12-20 23:34:08 +00:00
if opts.Stream == true {
logParams["follow"] = "true"
}
2019-12-20 23:34:08 +00:00
2019-12-24 17:33:05 +00:00
logs, err := k.client.Log(&client.Resource{
Name: pod,
Kind: "pod",
}, client.LogParams(logParams))
if err != nil {
return nil, err
}
defer logs.Close()
2019-12-20 23:34:08 +00:00
s := bufio.NewScanner(logs)
2019-12-20 23:34:08 +00:00
for s.Scan() {
record := k.parse(s.Text())
2019-12-24 17:33:05 +00:00
record.Metadata["pod"] = pod
records = append(records, record)
}
}
2019-12-20 23:34:08 +00:00
// sort the records
sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) })
return records, nil
2019-12-17 17:24:01 +00:00
}
2019-12-17 12:11:26 +00:00
2019-12-17 17:24:01 +00:00
func (k *klog) Write(l log.Record) error {
return write(l)
}
2019-12-17 12:11:26 +00:00
2019-12-17 17:24:01 +00:00
func (k *klog) Stream() (log.Stream, error) {
2019-12-20 23:34:08 +00:00
// find the matching pods
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
2019-12-20 23:34:08 +00:00
stream := &kubeStream{
stream: make(chan log.Record),
stop: make(chan bool),
}
2019-12-20 23:34:08 +00:00
// stream from the individual pods
for _, pod := range pods {
2019-12-20 23:34:08 +00:00
go k.podLogStream(pod, stream)
}
2019-12-20 23:34:08 +00:00
return stream, nil
2019-12-17 12:11:26 +00:00
}
2019-12-20 23:36:16 +00:00
// NewLog returns a configured Kubernetes logger
func NewLog(opts ...log.Option) log.Log {
klog := &klog{}
for _, o := range opts {
o(&klog.Options)
}
if len(os.Getenv("KUBERNETES_SERVICE_HOST")) > 0 {
klog.client = client.NewClusterClient()
} else {
klog.client = client.NewLocalClient()
}
return klog
}