simplify runtime logs

This commit is contained in:
Asim Aslam 2020-08-11 22:57:30 +01:00
parent 69a53e8070
commit 375b67ee16
4 changed files with 23 additions and 22 deletions

View File

@ -347,7 +347,7 @@ func (k *kubernetes) Init(opts ...runtime.Option) error {
return nil return nil
} }
func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.LogStream, error) { func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) {
klo := newLog(k.client, s.Name, options...) klo := newLog(k.client, s.Name, options...)
if !klo.options.Stream { if !klo.options.Stream {
@ -357,7 +357,7 @@ func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (ru
return nil, err return nil, err
} }
kstream := &kubeStream{ kstream := &kubeStream{
stream: make(chan runtime.LogRecord), stream: make(chan runtime.Log),
stop: make(chan bool), stop: make(chan bool),
} }
go func() { go func() {
@ -377,7 +377,7 @@ func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (ru
type kubeStream struct { type kubeStream struct {
// the k8s log stream // the k8s log stream
stream chan runtime.LogRecord stream chan runtime.Log
// the stop chan // the stop chan
sync.Mutex sync.Mutex
stop chan bool stop chan bool
@ -388,7 +388,7 @@ func (k *kubeStream) Error() error {
return k.err return k.err
} }
func (k *kubeStream) Chan() chan runtime.LogRecord { func (k *kubeStream) Chan() chan runtime.Log {
return k.stream return k.stream
} }

View File

@ -21,7 +21,7 @@ type klog struct {
options runtime.LogsOptions options runtime.LogsOptions
} }
func (k *klog) podLogStream(podName string, stream *kubeStream) error { func (k *klog) podLogs(podName string, stream *kubeStream) error {
p := make(map[string]string) p := make(map[string]string)
p["follow"] = "true" p["follow"] = "true"
@ -51,7 +51,7 @@ func (k *klog) podLogStream(podName string, stream *kubeStream) error {
return stream.Error() return stream.Error()
default: default:
if s.Scan() { if s.Scan() {
record := runtime.LogRecord{ record := runtime.Log{
Message: s.Text(), Message: s.Text(),
} }
select { select {
@ -101,7 +101,7 @@ func (k *klog) getMatchingPods() ([]string, error) {
return matches, nil return matches, nil
} }
func (k *klog) Read() ([]runtime.LogRecord, error) { func (k *klog) Read() ([]runtime.Log, error) {
pods, err := k.getMatchingPods() pods, err := k.getMatchingPods()
if err != nil { if err != nil {
return nil, err return nil, err
@ -110,7 +110,7 @@ func (k *klog) Read() ([]runtime.LogRecord, error) {
return nil, errors.NotFound("runtime.logs", "no such service") return nil, errors.NotFound("runtime.logs", "no such service")
} }
var records []runtime.LogRecord var records []runtime.Log
for _, pod := range pods { for _, pod := range pods {
logParams := make(map[string]string) logParams := make(map[string]string)
@ -145,7 +145,7 @@ func (k *klog) Read() ([]runtime.LogRecord, error) {
s := bufio.NewScanner(logs) s := bufio.NewScanner(logs)
for s.Scan() { for s.Scan() {
record := runtime.LogRecord{ record := runtime.Log{
Message: s.Text(), Message: s.Text(),
} }
// record.Metadata["pod"] = pod // record.Metadata["pod"] = pod
@ -159,7 +159,7 @@ func (k *klog) Read() ([]runtime.LogRecord, error) {
return records, nil return records, nil
} }
func (k *klog) Stream() (runtime.LogStream, error) { func (k *klog) Stream() (runtime.Logs, error) {
// find the matching pods // find the matching pods
pods, err := k.getMatchingPods() pods, err := k.getMatchingPods()
if err != nil { if err != nil {
@ -170,14 +170,14 @@ func (k *klog) Stream() (runtime.LogStream, error) {
} }
stream := &kubeStream{ stream := &kubeStream{
stream: make(chan runtime.LogRecord), stream: make(chan runtime.Log),
stop: make(chan bool), stop: make(chan bool),
} }
// stream from the individual pods // stream from the individual pods
for _, pod := range pods { for _, pod := range pods {
go func(podName string) { go func(podName string) {
err := k.podLogStream(podName, stream) err := k.podLogs(podName, stream)
if err != nil { if err != nil {
log.Errorf("Error streaming from pod: %v", err) log.Errorf("Error streaming from pod: %v", err)
} }

View File

@ -329,14 +329,14 @@ func exists(path string) (bool, error) {
// The reason for this is because it's hard to calculate line offset // The reason for this is because it's hard to calculate line offset
// as opposed to character offset. // as opposed to character offset.
// This logger streams by default and only supports the `StreamCount` option. // This logger streams by default and only supports the `StreamCount` option.
func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.LogStream, error) { func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.Logs, error) {
lopts := runtime.LogsOptions{} lopts := runtime.LogsOptions{}
for _, o := range options { for _, o := range options {
o(&lopts) o(&lopts)
} }
ret := &logStream{ ret := &logStream{
service: s.Name, service: s.Name,
stream: make(chan runtime.LogRecord), stream: make(chan runtime.Log),
stop: make(chan bool), stop: make(chan bool),
} }
@ -380,7 +380,7 @@ func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (
ret.Stop() ret.Stop()
return return
} }
ret.stream <- runtime.LogRecord{Message: line.Text} ret.stream <- runtime.Log{Message: line.Text}
case <-ret.stop: case <-ret.stop:
return return
} }
@ -393,13 +393,13 @@ func (r *localRuntime) Logs(s *runtime.Service, options ...runtime.LogsOption) (
type logStream struct { type logStream struct {
tail *tail.Tail tail *tail.Tail
service string service string
stream chan runtime.LogRecord stream chan runtime.Log
sync.Mutex sync.Mutex
stop chan bool stop chan bool
err error err error
} }
func (l *logStream) Chan() chan runtime.LogRecord { func (l *logStream) Chan() chan runtime.Log {
return l.stream return l.stream
} }

View File

@ -23,7 +23,7 @@ type Runtime interface {
// Remove a service // Remove a service
Delete(*Service, ...DeleteOption) error Delete(*Service, ...DeleteOption) error
// Logs returns the logs for a service // Logs returns the logs for a service
Logs(*Service, ...LogsOption) (LogStream, error) Logs(*Service, ...LogsOption) (Logs, error)
// Start starts the runtime // Start starts the runtime
Start() error Start() error
// Stop shuts down the runtime // Stop shuts down the runtime
@ -32,14 +32,15 @@ type Runtime interface {
String() string String() string
} }
// Stream returns a log stream // Logs returns a log stream
type LogStream interface { type Logs interface {
Error() error Error() error
Chan() chan LogRecord Chan() chan Log
Stop() error Stop() error
} }
type LogRecord struct { // Log is a log message
type Log struct {
Message string Message string
Metadata map[string]string Metadata map[string]string
} }