From d2a3fd0b0407a36603e67a9e671d2e74752339ec Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 17 Dec 2019 16:56:55 +0000 Subject: [PATCH] Move stream to interface --- debug/log/default.go | 14 +-- debug/log/default_test.go | 2 +- debug/log/level.go | 35 ++++++++ debug/log/log.go | 151 ++------------------------------- debug/log/logger.go | 120 ++++++++++++++++++++++++++ debug/log/options.go | 7 -- debug/log/stream.go | 20 +++++ debug/service/handler/debug.go | 14 ++- debug/service/log.go | 54 +++++------- debug/service/service.go | 44 +++++----- debug/service/stream.go | 25 ++++++ 11 files changed, 269 insertions(+), 217 deletions(-) create mode 100644 debug/log/level.go create mode 100644 debug/log/logger.go create mode 100644 debug/log/stream.go create mode 100644 debug/service/stream.go diff --git a/debug/log/default.go b/debug/log/default.go index 3d79bdc9..428941f0 100644 --- a/debug/log/default.go +++ b/debug/log/default.go @@ -33,13 +33,14 @@ func NewLog(opts ...Option) Log { } // Write writes logs into logger -func (l *defaultLog) Write(r Record) { +func (l *defaultLog) Write(r Record) error { golog.Print(r.Value) l.Buffer.Put(fmt.Sprint(r.Value)) + return nil } // Read reads logs and returns them -func (l *defaultLog) Read(opts ...ReadOption) []Record { +func (l *defaultLog) Read(opts ...ReadOption) ([]Record, error) { options := ReadOptions{} // initialize the read options for _, o := range opts { @@ -78,12 +79,12 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { records = append(records, record) } - return records + return records, nil } // Stream returns channel for reading log records // along with a stop channel, close it when done -func (l *defaultLog) Stream() (<-chan Record, chan bool) { +func (l *defaultLog) Stream() (Stream, error) { // get stream channel from ring buffer stream, stop := l.Buffer.Stream() // make a buffered channel @@ -111,5 +112,8 @@ func (l *defaultLog) Stream() (<-chan Record, chan bool) { } }() - return records, stop + return &logStream{ + stream: records, + stop: stop, + }, nil } diff --git a/debug/log/default_test.go b/debug/log/default_test.go index e0f08a7c..7f5c84e8 100644 --- a/debug/log/default_test.go +++ b/debug/log/default_test.go @@ -23,7 +23,7 @@ func TestLogger(t *testing.T) { // Check if the logs are stored in the logger ring buffer expected := []string{"foobar", "foo bar"} - entries := DefaultLog.Read(Count(len(expected))) + entries, _ := DefaultLog.Read(Count(len(expected))) for i, entry := range entries { if !reflect.DeepEqual(entry.Value, expected[i]) { t.Errorf("expected %s, got %s", expected[i], entry.Value) diff --git a/debug/log/level.go b/debug/log/level.go new file mode 100644 index 00000000..2f8b425c --- /dev/null +++ b/debug/log/level.go @@ -0,0 +1,35 @@ +// Package log provides debug logging +package log + +import ( + "os" +) + +// level is a log level +type Level int + +const ( + LevelFatal Level = iota + LevelError + LevelInfo + LevelWarn + LevelDebug + LevelTrace +) + +func init() { + switch os.Getenv("MICRO_LOG_LEVEL") { + case "trace": + DefaultLevel = LevelTrace + case "debug": + DefaultLevel = LevelDebug + case "warn": + DefaultLevel = LevelWarn + case "info": + DefaultLevel = LevelInfo + case "error": + DefaultLevel = LevelError + case "fatal": + DefaultLevel = LevelFatal + } +} diff --git a/debug/log/log.go b/debug/log/log.go index 918d0c55..81a088c6 100644 --- a/debug/log/log.go +++ b/debug/log/log.go @@ -2,8 +2,6 @@ package log import ( - "fmt" - "os" "time" ) @@ -19,11 +17,11 @@ var ( // Log is event log type Log interface { // Read reads log entries from the logger - Read(...ReadOption) []Record + Read(...ReadOption) ([]Record, error) // Write writes records to log - Write(Record) + Write(Record) error // Stream log records - Stream() (<-chan Record, chan bool) + Stream() (Stream, error) } // Record is log record entry @@ -36,144 +34,7 @@ type Record struct { Metadata map[string]string } -// level is a log level -type Level int - -const ( - LevelFatal Level = iota - LevelError - LevelInfo - LevelWarn - LevelDebug - LevelTrace -) - -func init() { - switch os.Getenv("MICRO_LOG_LEVEL") { - case "trace": - DefaultLevel = LevelTrace - case "debug": - DefaultLevel = LevelDebug - case "warn": - DefaultLevel = LevelWarn - case "info": - DefaultLevel = LevelInfo - case "error": - DefaultLevel = LevelError - case "fatal": - DefaultLevel = LevelFatal - } -} - -func log(v ...interface{}) { - if len(prefix) > 0 { - DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)}) - return - } - DefaultLog.Write(Record{Value: fmt.Sprint(v...)}) -} - -func logf(format string, v ...interface{}) { - if len(prefix) > 0 { - format = prefix + " " + format - } - DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)}) -} - -// WithLevel logs with the level specified -func WithLevel(l Level, v ...interface{}) { - if l > DefaultLevel { - return - } - log(v...) -} - -// WithLevel logs with the level specified -func WithLevelf(l Level, format string, v ...interface{}) { - if l > DefaultLevel { - return - } - logf(format, v...) -} - -// Trace provides trace level logging -func Trace(v ...interface{}) { - WithLevel(LevelTrace, v...) -} - -// Tracef provides trace level logging -func Tracef(format string, v ...interface{}) { - WithLevelf(LevelTrace, format, v...) -} - -// Debug provides debug level logging -func Debug(v ...interface{}) { - WithLevel(LevelDebug, v...) -} - -// Debugf provides debug level logging -func Debugf(format string, v ...interface{}) { - WithLevelf(LevelDebug, format, v...) -} - -// Warn provides warn level logging -func Warn(v ...interface{}) { - WithLevel(LevelWarn, v...) -} - -// Warnf provides warn level logging -func Warnf(format string, v ...interface{}) { - WithLevelf(LevelWarn, format, v...) -} - -// Info provides info level logging -func Info(v ...interface{}) { - WithLevel(LevelInfo, v...) -} - -// Infof provides info level logging -func Infof(format string, v ...interface{}) { - WithLevelf(LevelInfo, format, v...) -} - -// Error provides warn level logging -func Error(v ...interface{}) { - WithLevel(LevelError, v...) -} - -// Errorf provides warn level logging -func Errorf(format string, v ...interface{}) { - WithLevelf(LevelError, format, v...) -} - -// Fatal logs with Log and then exits with os.Exit(1) -func Fatal(v ...interface{}) { - WithLevel(LevelFatal, v...) - os.Exit(1) -} - -// Fatalf logs with Logf and then exits with os.Exit(1) -func Fatalf(format string, v ...interface{}) { - WithLevelf(LevelFatal, format, v...) - os.Exit(1) -} - -// SetLevel sets the log level -func SetLevel(l Level) { - DefaultLevel = l -} - -// GetLevel returns the current level -func GetLevel() Level { - return DefaultLevel -} - -// Set a prefix for the logger -func SetPrefix(p string) { - prefix = p -} - -// Set service name -func SetName(name string) { - prefix = fmt.Sprintf("[%s]", name) +type Stream interface { + Chan() <-chan Record + Stop() error } diff --git a/debug/log/logger.go b/debug/log/logger.go new file mode 100644 index 00000000..912defc2 --- /dev/null +++ b/debug/log/logger.go @@ -0,0 +1,120 @@ +// Package log provides debug logging +package log + +import ( + "fmt" + "os" +) + +func log(v ...interface{}) { + if len(prefix) > 0 { + DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)}) + return + } + DefaultLog.Write(Record{Value: fmt.Sprint(v...)}) +} + +func logf(format string, v ...interface{}) { + if len(prefix) > 0 { + format = prefix + " " + format + } + DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)}) +} + +// WithLevel logs with the level specified +func WithLevel(l Level, v ...interface{}) { + if l > DefaultLevel { + return + } + log(v...) +} + +// WithLevel logs with the level specified +func WithLevelf(l Level, format string, v ...interface{}) { + if l > DefaultLevel { + return + } + logf(format, v...) +} + +// Trace provides trace level logging +func Trace(v ...interface{}) { + WithLevel(LevelTrace, v...) +} + +// Tracef provides trace level logging +func Tracef(format string, v ...interface{}) { + WithLevelf(LevelTrace, format, v...) +} + +// Debug provides debug level logging +func Debug(v ...interface{}) { + WithLevel(LevelDebug, v...) +} + +// Debugf provides debug level logging +func Debugf(format string, v ...interface{}) { + WithLevelf(LevelDebug, format, v...) +} + +// Warn provides warn level logging +func Warn(v ...interface{}) { + WithLevel(LevelWarn, v...) +} + +// Warnf provides warn level logging +func Warnf(format string, v ...interface{}) { + WithLevelf(LevelWarn, format, v...) +} + +// Info provides info level logging +func Info(v ...interface{}) { + WithLevel(LevelInfo, v...) +} + +// Infof provides info level logging +func Infof(format string, v ...interface{}) { + WithLevelf(LevelInfo, format, v...) +} + +// Error provides warn level logging +func Error(v ...interface{}) { + WithLevel(LevelError, v...) +} + +// Errorf provides warn level logging +func Errorf(format string, v ...interface{}) { + WithLevelf(LevelError, format, v...) +} + +// Fatal logs with Log and then exits with os.Exit(1) +func Fatal(v ...interface{}) { + WithLevel(LevelFatal, v...) + os.Exit(1) +} + +// Fatalf logs with Logf and then exits with os.Exit(1) +func Fatalf(format string, v ...interface{}) { + WithLevelf(LevelFatal, format, v...) + os.Exit(1) +} + +// SetLevel sets the log level +func SetLevel(l Level) { + DefaultLevel = l +} + +// GetLevel returns the current level +func GetLevel() Level { + return DefaultLevel +} + +// Set a prefix for the logger +func SetPrefix(p string) { + prefix = p +} + +// Set service name +func SetName(name string) { + prefix = fmt.Sprintf("[%s]", name) +} diff --git a/debug/log/options.go b/debug/log/options.go index 0d83394b..b17c2f43 100644 --- a/debug/log/options.go +++ b/debug/log/options.go @@ -60,10 +60,3 @@ func Count(c int) ReadOption { o.Count = c } } - -// Stream requests continuous log stream -func Stream(s bool) ReadOption { - return func(o *ReadOptions) { - o.Stream = s - } -} diff --git a/debug/log/stream.go b/debug/log/stream.go new file mode 100644 index 00000000..0effe358 --- /dev/null +++ b/debug/log/stream.go @@ -0,0 +1,20 @@ +package log + +type logStream struct { + stream <-chan Record + stop chan bool +} + +func (l *logStream) Chan() <-chan Record { + return l.stream +} + +func (l *logStream) Stop() error { + select { + case <-l.stop: + return nil + default: + close(l.stop) + } + return nil +} diff --git a/debug/service/handler/debug.go b/debug/service/handler/debug.go index 7f2f16e4..c0e26c5e 100644 --- a/debug/service/handler/debug.go +++ b/debug/service/handler/debug.go @@ -71,10 +71,13 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error { // the connection stays open until some timeout expires // or something like that; that means the map of streams // might end up leaking memory if not cleaned up properly - records, stop := d.log.Stream() - defer close(stop) + lgStream, err := d.log.Stream() + if err != nil { + return err + } + defer lgStream.Stop() - for record := range records { + for record := range lgStream.Chan() { if err := d.sendRecord(record, stream); err != nil { return err } @@ -85,7 +88,10 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error { } // get the log records - records := d.log.Read(options...) + records, err := d.log.Read(options...) + if err != nil { + return err + } // send all the logs downstream for _, record := range records { diff --git a/debug/service/log.go b/debug/service/log.go index 647b349a..95ad0f3c 100644 --- a/debug/service/log.go +++ b/debug/service/log.go @@ -1,6 +1,8 @@ package service import ( + "time" + "github.com/micro/go-micro/debug" "github.com/micro/go-micro/debug/log" ) @@ -10,50 +12,36 @@ type serviceLog struct { } // Read reads log entries from the logger -func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record { - // TODO: parse opts - stream, err := s.Client.Log(opts...) - if err != nil { - return nil +func (s *serviceLog) Read(opts ...log.ReadOption) ([]log.Record, error) { + var options log.ReadOptions + for _, o := range opts { + o(&options) } + + stream, err := s.Client.Log(options.Since, options.Count, false) + if err != nil { + return nil, err + } + defer stream.Stop() + // stream the records until nothing is left var records []log.Record - for record := range stream { + + for record := range stream.Chan() { records = append(records, record) } - return records + + return records, nil } // There is no write support -func (s *serviceLog) Write(r log.Record) { - return +func (s *serviceLog) Write(r log.Record) error { + return nil } // Stream log records -func (s *serviceLog) Stream() (<-chan log.Record, chan bool) { - stop := make(chan bool) - stream, err := s.Client.Log(log.Stream(true)) - if err != nil { - // return a closed stream - deadStream := make(chan log.Record) - close(deadStream) - return deadStream, stop - } - - newStream := make(chan log.Record, 128) - - go func() { - for { - select { - case rec := <-stream: - newStream <- rec - case <-stop: - return - } - } - }() - - return newStream, stop +func (s *serviceLog) Stream() (log.Stream, error) { + return s.Client.Log(time.Time{}, 0, true) } // NewLog returns a new log interface diff --git a/debug/service/service.go b/debug/service/service.go index 1da11c6e..805a84a3 100644 --- a/debug/service/service.go +++ b/debug/service/service.go @@ -27,42 +27,40 @@ func NewClient(name string) *debugClient { } } -// Logs queries the service logs and returns a channel to read the logs from -func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) { - var options log.ReadOptions - // initialize the read options - for _, o := range opts { - o(&options) - } - +// Logs queries the services logs and returns a channel to read the logs from +func (d *debugClient) Log(since time.Time, count int, stream bool) (log.Stream, error) { req := &pb.LogRequest{} - if !options.Since.IsZero() { - req.Since = options.Since.Unix() + if !since.IsZero() { + req.Since = since.Unix() } - if options.Count > 0 { - req.Count = int64(options.Count) + if count > 0 { + req.Count = int64(count) } - req.Stream = options.Stream + // set whether to stream + req.Stream = stream // get the log stream - stream, err := d.Client.Log(context.Background(), req) + serverStream, err := d.Client.Log(context.Background(), req) if err != nil { return nil, fmt.Errorf("failed getting log stream: %s", err) } - // log channel for streaming logs - logChan := make(chan log.Record) + lg := &logStream{ + stream: make(chan log.Record), + stop: make(chan bool), + } // go stream logs - go d.streamLogs(logChan, stream) + go d.streamLogs(lg, serverStream) - return logChan, nil + return lg, nil } -func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { +func (d *debugClient) streamLogs(lg *logStream, stream pb.Debug_LogService) { defer stream.Close() + defer lg.Stop() for { resp, err := stream.Recv() @@ -81,8 +79,10 @@ func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogSer Metadata: metadata, } - logChan <- record + select { + case <-lg.stop: + return + case lg.stream <- record: + } } - - close(logChan) } diff --git a/debug/service/stream.go b/debug/service/stream.go new file mode 100644 index 00000000..d7bd9765 --- /dev/null +++ b/debug/service/stream.go @@ -0,0 +1,25 @@ +package service + +import ( + "github.com/micro/go-micro/debug/log" +) + +type logStream struct { + stream chan log.Record + stop chan bool +} + +func (l *logStream) Chan() <-chan log.Record { + return l.stream +} + +func (l *logStream) Stop() error { + select { + case <-l.stop: + return nil + default: + close(l.stream) + close(l.stop) + } + return nil +}