Runtime logs (#1447)

* Runtime logs

* Slightly broken

* Pushing for diff

* Log trailing works locally

* LogsOptions

* Comments and streamcount support for local logs

* Adding kubernetes logs

* Fixing k8s logs

* K8s fixes

* StreamCount is now nuked

* PR comments

* PR comments again

* Fix typo
This commit is contained in:
Janos Dobronszki
2020-04-01 15:40:15 +02:00
committed by GitHub
parent 20c95d94cd
commit bb51b8203e
11 changed files with 744 additions and 299 deletions

View File

@@ -301,6 +301,53 @@ func (k *kubernetes) Init(opts ...runtime.Option) error {
return nil
}
func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.LogStream, error) {
klo := newLog(k.client, s.Name, options...)
stream, err := klo.Stream()
if err != nil {
return nil, err
}
// If requested, also read existing records and stream those too
if klo.options.Count > 0 {
go func() {
records, err := klo.Read()
if err != nil {
logger.Errorf("Failed to get logs for service '%v' from k8s: %v", err)
return
}
// @todo: this might actually not run before podLogStream starts
// and might cause out of order log retrieval at the receiving end.
// A better approach would probably to suppor this inside the `klog.Stream` method.
for _, record := range records {
stream.Chan() <- record
}
}()
}
return stream, nil
}
type kubeStream struct {
// the k8s log stream
stream chan runtime.LogRecord
// the stop chan
stop chan bool
}
func (k *kubeStream) Chan() chan runtime.LogRecord {
return k.stream
}
func (k *kubeStream) Stop() error {
select {
case <-k.stop:
return nil
default:
close(k.stop)
close(k.stream)
}
return nil
}
// Creates a service
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
k.Lock()

View File

@@ -0,0 +1,170 @@
// Package kubernetes taken from https://github.com/micro/go-micro/blob/master/debug/log/kubernetes/kubernetes.go
// There are some modifications compared to the other files as
// this package doesn't provide write functionality.
// With the write functionality gone, structured logs also go away.
package kubernetes
import (
"bufio"
"fmt"
"os"
"strconv"
"time"
"github.com/micro/go-micro/v2/runtime"
"github.com/micro/go-micro/v2/util/kubernetes/client"
)
type klog struct {
client client.Client
serviceName string
options runtime.LogsOptions
}
func (k *klog) podLogStream(podName string, stream *kubeStream) {
p := make(map[string]string)
p["follow"] = "true"
// get the logs for the pod
body, err := k.client.Log(&client.Resource{
Name: podName,
Kind: "pod",
}, client.LogParams(p))
if err != nil {
fmt.Fprintf(os.Stderr, err.Error())
return
}
s := bufio.NewScanner(body)
defer body.Close()
for {
select {
case <-stream.stop:
return
default:
if s.Scan() {
record := runtime.LogRecord{
Message: s.Text(),
}
stream.stream <- record
} else {
// TODO: is there a blocking call
// rather than a sleep loop?
time.Sleep(time.Second)
}
}
}
}
func (k *klog) getMatchingPods() ([]string, error) {
r := &client.Resource{
Kind: "pod",
Value: new(client.PodList),
}
l := make(map[string]string)
l["name"] = client.Format(k.serviceName)
// TODO: specify micro:service
// l["micro"] = "service"
if err := k.client.Get(r, l); err != nil {
return nil, err
}
var matches []string
for _, p := range r.Value.(*client.PodList).Items {
// find labels that match the name
if p.Metadata.Labels["name"] == client.Format(k.serviceName) {
matches = append(matches, p.Metadata.Name)
}
}
return matches, nil
}
func (k *klog) Read() ([]runtime.LogRecord, error) {
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
var records []runtime.LogRecord
for _, pod := range pods {
logParams := make(map[string]string)
//if !opts.Since.Equal(time.Time{}) {
// logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds()))
//}
if k.options.Count != 0 {
logParams["tailLines"] = strconv.Itoa(int(k.options.Count))
}
if k.options.Stream == true {
logParams["follow"] = "true"
}
logs, err := k.client.Log(&client.Resource{
Name: pod,
Kind: "pod",
}, client.LogParams(logParams))
if err != nil {
return nil, err
}
defer logs.Close()
s := bufio.NewScanner(logs)
for s.Scan() {
record := runtime.LogRecord{
Message: s.Text(),
}
// record.Metadata["pod"] = pod
records = append(records, record)
}
}
// sort the records
// sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) })
return records, nil
}
func (k *klog) Stream() (runtime.LogStream, error) {
// find the matching pods
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
stream := &kubeStream{
stream: make(chan runtime.LogRecord),
stop: make(chan bool),
}
// stream from the individual pods
for _, pod := range pods {
go k.podLogStream(pod, stream)
}
return stream, nil
}
// NewLog returns a configured Kubernetes logger
func newLog(client client.Client, serviceName string, opts ...runtime.LogsOption) *klog {
klog := &klog{
serviceName: serviceName,
client: client,
}
for _, o := range opts {
o(&klog.options)
}
return klog
}