From 2cafa289b6b77a86e4c43b6a7063c2c3e52a6ecc Mon Sep 17 00:00:00 2001 From: Janos Dobronszki Date: Thu, 2 Apr 2020 13:16:35 +0200 Subject: [PATCH] Stop LogStream if there is an error in k8s pod log streaming (#1469) * Stop LogStream if there is an error in k8s pod log streaming * Locking stream Stops * PR comment --- debug/log/kubernetes/stream.go | 4 ++++ runtime/default.go | 7 +++++-- runtime/kubernetes/kubernetes.go | 3 +++ runtime/kubernetes/kubernetes_logs.go | 1 + runtime/service/service.go | 7 +++++-- 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/debug/log/kubernetes/stream.go b/debug/log/kubernetes/stream.go index e8fd3e98..50d18100 100644 --- a/debug/log/kubernetes/stream.go +++ b/debug/log/kubernetes/stream.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "os" + "sync" "github.com/micro/go-micro/v2/debug/log" ) @@ -20,6 +21,7 @@ func write(l log.Record) error { type kubeStream struct { // the k8s log stream stream chan log.Record + sync.Mutex // the stop chan stop chan bool } @@ -29,6 +31,8 @@ func (k *kubeStream) Chan() <-chan log.Record { } func (k *kubeStream) Stop() error { + k.Lock() + defer k.Unlock() select { case <-k.stop: return nil diff --git a/runtime/default.go b/runtime/default.go index c0452bc8..614319d3 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -259,8 +259,9 @@ type logStream struct { tail *tail.Tail service string stream chan LogRecord - stop chan bool - err error + sync.Mutex + stop chan bool + err error } func (l *logStream) Chan() chan LogRecord { @@ -272,6 +273,8 @@ func (l *logStream) Error() error { } func (l *logStream) Stop() error { + l.Lock() + defer l.Unlock() // @todo seems like this is causing a hangup //err := l.tail.Stop() //if err != nil { diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index e381721e..12733bdb 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -330,6 +330,7 @@ type kubeStream struct { // the k8s log stream stream chan runtime.LogRecord // the stop chan + sync.Mutex stop chan bool err error } @@ -343,6 +344,8 @@ func (k *kubeStream) Chan() chan runtime.LogRecord { } func (k *kubeStream) Stop() error { + k.Lock() + defer k.Unlock() select { case <-k.stop: return nil diff --git a/runtime/kubernetes/kubernetes_logs.go b/runtime/kubernetes/kubernetes_logs.go index 45e928a1..3600170b 100644 --- a/runtime/kubernetes/kubernetes_logs.go +++ b/runtime/kubernetes/kubernetes_logs.go @@ -32,6 +32,7 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) error { if err != nil { stream.err = err + stream.Stop() return err } diff --git a/runtime/service/service.go b/runtime/service/service.go index 4f7dd6e6..8cadfcdf 100644 --- a/runtime/service/service.go +++ b/runtime/service/service.go @@ -95,8 +95,9 @@ func (s *svc) Logs(service *runtime.Service, options ...runtime.LogsOption) (run type serviceLogStream struct { service string stream chan runtime.LogRecord - stop chan bool - err error + sync.Mutex + stop chan bool + err error } func (l *serviceLogStream) Error() error { @@ -108,6 +109,8 @@ func (l *serviceLogStream) Chan() chan runtime.LogRecord { } func (l *serviceLogStream) Stop() error { + l.Lock() + defer l.Unlock() select { case <-l.stop: return nil