Stop LogStream if there is an error in k8s pod log streaming (#1469)

* Stop LogStream if there is an error in k8s pod log streaming

* Locking stream Stops

* PR comment
This commit is contained in:
Janos Dobronszki 2020-04-02 13:16:35 +02:00 committed by GitHub
parent 0241197c6a
commit 2cafa289b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 18 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"sync"
"github.com/micro/go-micro/v2/debug/log" "github.com/micro/go-micro/v2/debug/log"
) )
@ -20,6 +21,7 @@ func write(l log.Record) error {
type kubeStream struct { type kubeStream struct {
// the k8s log stream // the k8s log stream
stream chan log.Record stream chan log.Record
sync.Mutex
// the stop chan // the stop chan
stop chan bool stop chan bool
} }
@ -29,6 +31,8 @@ func (k *kubeStream) Chan() <-chan log.Record {
} }
func (k *kubeStream) Stop() error { func (k *kubeStream) Stop() error {
k.Lock()
defer k.Unlock()
select { select {
case <-k.stop: case <-k.stop:
return nil return nil

View File

@ -259,8 +259,9 @@ type logStream struct {
tail *tail.Tail tail *tail.Tail
service string service string
stream chan LogRecord stream chan LogRecord
stop chan bool sync.Mutex
err error stop chan bool
err error
} }
func (l *logStream) Chan() chan LogRecord { func (l *logStream) Chan() chan LogRecord {
@ -272,6 +273,8 @@ func (l *logStream) Error() error {
} }
func (l *logStream) Stop() error { func (l *logStream) Stop() error {
l.Lock()
defer l.Unlock()
// @todo seems like this is causing a hangup // @todo seems like this is causing a hangup
//err := l.tail.Stop() //err := l.tail.Stop()
//if err != nil { //if err != nil {

View File

@ -330,6 +330,7 @@ type kubeStream struct {
// the k8s log stream // the k8s log stream
stream chan runtime.LogRecord stream chan runtime.LogRecord
// the stop chan // the stop chan
sync.Mutex
stop chan bool stop chan bool
err error err error
} }
@ -343,6 +344,8 @@ func (k *kubeStream) Chan() chan runtime.LogRecord {
} }
func (k *kubeStream) Stop() error { func (k *kubeStream) Stop() error {
k.Lock()
defer k.Unlock()
select { select {
case <-k.stop: case <-k.stop:
return nil return nil

View File

@ -32,6 +32,7 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) error {
if err != nil { if err != nil {
stream.err = err stream.err = err
stream.Stop()
return err return err
} }

View File

@ -95,8 +95,9 @@ func (s *svc) Logs(service *runtime.Service, options ...runtime.LogsOption) (run
type serviceLogStream struct { type serviceLogStream struct {
service string service string
stream chan runtime.LogRecord stream chan runtime.LogRecord
stop chan bool sync.Mutex
err error stop chan bool
err error
} }
func (l *serviceLogStream) Error() error { func (l *serviceLogStream) Error() error {
@ -108,6 +109,8 @@ func (l *serviceLogStream) Chan() chan runtime.LogRecord {
} }
func (l *serviceLogStream) Stop() error { func (l *serviceLogStream) Stop() error {
l.Lock()
defer l.Unlock()
select { select {
case <-l.stop: case <-l.stop:
return nil return nil