micro/runtime/kubernetes/kubernetes_logs.go
Janos Dobronszki 2cafa289b6
Stop LogStream if there is an error in k8s pod log streaming (#1469)
* Stop LogStream if there is an error in k8s pod log streaming

* Locking stream Stops

* PR comment
2020-04-02 12:16:35 +01:00

176 lines
3.6 KiB
Go

// Package kubernetes taken from https://github.com/micro/go-micro/blob/master/debug/log/kubernetes/kubernetes.go
// There are some modifications compared to the other files as
// this package doesn't provide write functionality.
// With the write functionality gone, structured logs also go away.
package kubernetes
import (
"bufio"
"strconv"
"time"
"github.com/micro/go-micro/v2/runtime"
"github.com/micro/go-micro/v2/util/kubernetes/client"
"github.com/micro/go-micro/v2/util/log"
)
type klog struct {
client client.Client
serviceName string
options runtime.LogsOptions
}
func (k *klog) podLogStream(podName string, stream *kubeStream) error {
p := make(map[string]string)
p["follow"] = "true"
// get the logs for the pod
body, err := k.client.Log(&client.Resource{
Name: podName,
Kind: "pod",
}, client.LogParams(p))
if err != nil {
stream.err = err
stream.Stop()
return err
}
s := bufio.NewScanner(body)
defer body.Close()
for {
select {
case <-stream.stop:
return stream.Error()
default:
if s.Scan() {
record := runtime.LogRecord{
Message: s.Text(),
}
stream.stream <- record
} else {
// TODO: is there a blocking call
// rather than a sleep loop?
time.Sleep(time.Second)
}
}
}
}
func (k *klog) getMatchingPods() ([]string, error) {
r := &client.Resource{
Kind: "pod",
Value: new(client.PodList),
}
l := make(map[string]string)
l["name"] = client.Format(k.serviceName)
// TODO: specify micro:service
// l["micro"] = "service"
if err := k.client.Get(r, l); err != nil {
return nil, err
}
var matches []string
for _, p := range r.Value.(*client.PodList).Items {
// find labels that match the name
if p.Metadata.Labels["name"] == client.Format(k.serviceName) {
matches = append(matches, p.Metadata.Name)
}
}
return matches, nil
}
func (k *klog) Read() ([]runtime.LogRecord, error) {
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
var records []runtime.LogRecord
for _, pod := range pods {
logParams := make(map[string]string)
//if !opts.Since.Equal(time.Time{}) {
// logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds()))
//}
if k.options.Count != 0 {
logParams["tailLines"] = strconv.Itoa(int(k.options.Count))
}
if k.options.Stream == true {
logParams["follow"] = "true"
}
logs, err := k.client.Log(&client.Resource{
Name: pod,
Kind: "pod",
}, client.LogParams(logParams))
if err != nil {
return nil, err
}
defer logs.Close()
s := bufio.NewScanner(logs)
for s.Scan() {
record := runtime.LogRecord{
Message: s.Text(),
}
// record.Metadata["pod"] = pod
records = append(records, record)
}
}
// sort the records
// sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) })
return records, nil
}
func (k *klog) Stream() (runtime.LogStream, error) {
// find the matching pods
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
stream := &kubeStream{
stream: make(chan runtime.LogRecord),
stop: make(chan bool),
}
// stream from the individual pods
for _, pod := range pods {
go func(podName string) {
err := k.podLogStream(podName, stream)
if err != nil {
log.Errorf("Error streaming from pod: %v", err)
}
}(pod)
}
return stream, nil
}
// NewLog returns a configured Kubernetes logger
func newLog(client client.Client, serviceName string, opts ...runtime.LogsOption) *klog {
klog := &klog{
serviceName: serviceName,
client: client,
}
for _, o := range opts {
o(&klog.options)
}
return klog
}