log.Errorf when pod streaming fails (#1463)
* log.Errorf when pod streaming fails * Error method added for loggers Co-authored-by: Asim Aslam <asim@aslam.me>
This commit is contained in:
@@ -260,12 +260,17 @@ type logStream struct {
|
||||
service string
|
||||
stream chan LogRecord
|
||||
stop chan bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (l *logStream) Chan() chan LogRecord {
|
||||
return l.stream
|
||||
}
|
||||
|
||||
func (l *logStream) Error() error {
|
||||
return l.err
|
||||
}
|
||||
|
||||
func (l *logStream) Stop() error {
|
||||
// @todo seems like this is causing a hangup
|
||||
//err := l.tail.Stop()
|
||||
|
@@ -331,6 +331,11 @@ type kubeStream struct {
|
||||
stream chan runtime.LogRecord
|
||||
// the stop chan
|
||||
stop chan bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (k *kubeStream) Error() error {
|
||||
return k.err
|
||||
}
|
||||
|
||||
func (k *kubeStream) Chan() chan runtime.LogRecord {
|
||||
|
@@ -6,13 +6,12 @@ package kubernetes
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v2/runtime"
|
||||
"github.com/micro/go-micro/v2/util/kubernetes/client"
|
||||
"github.com/micro/go-micro/v2/util/log"
|
||||
)
|
||||
|
||||
type klog struct {
|
||||
@@ -21,7 +20,7 @@ type klog struct {
|
||||
options runtime.LogsOptions
|
||||
}
|
||||
|
||||
func (k *klog) podLogStream(podName string, stream *kubeStream) {
|
||||
func (k *klog) podLogStream(podName string, stream *kubeStream) error {
|
||||
p := make(map[string]string)
|
||||
p["follow"] = "true"
|
||||
|
||||
@@ -32,8 +31,8 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) {
|
||||
}, client.LogParams(p))
|
||||
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, err.Error())
|
||||
return
|
||||
stream.err = err
|
||||
return err
|
||||
}
|
||||
|
||||
s := bufio.NewScanner(body)
|
||||
@@ -42,7 +41,7 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) {
|
||||
for {
|
||||
select {
|
||||
case <-stream.stop:
|
||||
return
|
||||
return stream.Error()
|
||||
default:
|
||||
if s.Scan() {
|
||||
record := runtime.LogRecord{
|
||||
@@ -150,7 +149,12 @@ func (k *klog) Stream() (runtime.LogStream, error) {
|
||||
|
||||
// stream from the individual pods
|
||||
for _, pod := range pods {
|
||||
go k.podLogStream(pod, stream)
|
||||
go func(podName string) {
|
||||
err := k.podLogStream(podName, stream)
|
||||
if err != nil {
|
||||
log.Errorf("Error streaming from pod: %v", err)
|
||||
}
|
||||
}(pod)
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
|
@@ -41,6 +41,7 @@ type Runtime interface {
|
||||
|
||||
// Stream returns a log stream
|
||||
type LogStream interface {
|
||||
Error() error
|
||||
Chan() chan LogRecord
|
||||
Stop() error
|
||||
}
|
||||
|
@@ -96,6 +96,11 @@ type serviceLogStream struct {
|
||||
service string
|
||||
stream chan runtime.LogRecord
|
||||
stop chan bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (l *serviceLogStream) Error() error {
|
||||
return l.err
|
||||
}
|
||||
|
||||
func (l *serviceLogStream) Chan() chan runtime.LogRecord {
|
||||
|
Reference in New Issue
Block a user