Move stream to interface

This commit is contained in:
Asim Aslam 2019-12-17 16:56:55 +00:00
parent 91e057440d
commit d2a3fd0b04
11 changed files with 269 additions and 217 deletions

View File

@ -33,13 +33,14 @@ func NewLog(opts ...Option) Log {
} }
// Write writes logs into logger // Write writes logs into logger
func (l *defaultLog) Write(r Record) { func (l *defaultLog) Write(r Record) error {
golog.Print(r.Value) golog.Print(r.Value)
l.Buffer.Put(fmt.Sprint(r.Value)) l.Buffer.Put(fmt.Sprint(r.Value))
return nil
} }
// Read reads logs and returns them // Read reads logs and returns them
func (l *defaultLog) Read(opts ...ReadOption) []Record { func (l *defaultLog) Read(opts ...ReadOption) ([]Record, error) {
options := ReadOptions{} options := ReadOptions{}
// initialize the read options // initialize the read options
for _, o := range opts { for _, o := range opts {
@ -78,12 +79,12 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
records = append(records, record) records = append(records, record)
} }
return records return records, nil
} }
// Stream returns channel for reading log records // Stream returns channel for reading log records
// along with a stop channel, close it when done // along with a stop channel, close it when done
func (l *defaultLog) Stream() (<-chan Record, chan bool) { func (l *defaultLog) Stream() (Stream, error) {
// get stream channel from ring buffer // get stream channel from ring buffer
stream, stop := l.Buffer.Stream() stream, stop := l.Buffer.Stream()
// make a buffered channel // make a buffered channel
@ -111,5 +112,8 @@ func (l *defaultLog) Stream() (<-chan Record, chan bool) {
} }
}() }()
return records, stop return &logStream{
stream: records,
stop: stop,
}, nil
} }

View File

@ -23,7 +23,7 @@ func TestLogger(t *testing.T) {
// Check if the logs are stored in the logger ring buffer // Check if the logs are stored in the logger ring buffer
expected := []string{"foobar", "foo bar"} expected := []string{"foobar", "foo bar"}
entries := DefaultLog.Read(Count(len(expected))) entries, _ := DefaultLog.Read(Count(len(expected)))
for i, entry := range entries { for i, entry := range entries {
if !reflect.DeepEqual(entry.Value, expected[i]) { if !reflect.DeepEqual(entry.Value, expected[i]) {
t.Errorf("expected %s, got %s", expected[i], entry.Value) t.Errorf("expected %s, got %s", expected[i], entry.Value)

35
debug/log/level.go Normal file
View File

@ -0,0 +1,35 @@
// Package log provides debug logging
package log
import (
"os"
)
// level is a log level
type Level int
const (
LevelFatal Level = iota
LevelError
LevelInfo
LevelWarn
LevelDebug
LevelTrace
)
func init() {
switch os.Getenv("MICRO_LOG_LEVEL") {
case "trace":
DefaultLevel = LevelTrace
case "debug":
DefaultLevel = LevelDebug
case "warn":
DefaultLevel = LevelWarn
case "info":
DefaultLevel = LevelInfo
case "error":
DefaultLevel = LevelError
case "fatal":
DefaultLevel = LevelFatal
}
}

View File

@ -2,8 +2,6 @@
package log package log
import ( import (
"fmt"
"os"
"time" "time"
) )
@ -19,11 +17,11 @@ var (
// Log is event log // Log is event log
type Log interface { type Log interface {
// Read reads log entries from the logger // Read reads log entries from the logger
Read(...ReadOption) []Record Read(...ReadOption) ([]Record, error)
// Write writes records to log // Write writes records to log
Write(Record) Write(Record) error
// Stream log records // Stream log records
Stream() (<-chan Record, chan bool) Stream() (Stream, error)
} }
// Record is log record entry // Record is log record entry
@ -36,144 +34,7 @@ type Record struct {
Metadata map[string]string Metadata map[string]string
} }
// level is a log level type Stream interface {
type Level int Chan() <-chan Record
Stop() error
const (
LevelFatal Level = iota
LevelError
LevelInfo
LevelWarn
LevelDebug
LevelTrace
)
func init() {
switch os.Getenv("MICRO_LOG_LEVEL") {
case "trace":
DefaultLevel = LevelTrace
case "debug":
DefaultLevel = LevelDebug
case "warn":
DefaultLevel = LevelWarn
case "info":
DefaultLevel = LevelInfo
case "error":
DefaultLevel = LevelError
case "fatal":
DefaultLevel = LevelFatal
}
}
func log(v ...interface{}) {
if len(prefix) > 0 {
DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)})
return
}
DefaultLog.Write(Record{Value: fmt.Sprint(v...)})
}
func logf(format string, v ...interface{}) {
if len(prefix) > 0 {
format = prefix + " " + format
}
DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)})
}
// WithLevel logs with the level specified
func WithLevel(l Level, v ...interface{}) {
if l > DefaultLevel {
return
}
log(v...)
}
// WithLevel logs with the level specified
func WithLevelf(l Level, format string, v ...interface{}) {
if l > DefaultLevel {
return
}
logf(format, v...)
}
// Trace provides trace level logging
func Trace(v ...interface{}) {
WithLevel(LevelTrace, v...)
}
// Tracef provides trace level logging
func Tracef(format string, v ...interface{}) {
WithLevelf(LevelTrace, format, v...)
}
// Debug provides debug level logging
func Debug(v ...interface{}) {
WithLevel(LevelDebug, v...)
}
// Debugf provides debug level logging
func Debugf(format string, v ...interface{}) {
WithLevelf(LevelDebug, format, v...)
}
// Warn provides warn level logging
func Warn(v ...interface{}) {
WithLevel(LevelWarn, v...)
}
// Warnf provides warn level logging
func Warnf(format string, v ...interface{}) {
WithLevelf(LevelWarn, format, v...)
}
// Info provides info level logging
func Info(v ...interface{}) {
WithLevel(LevelInfo, v...)
}
// Infof provides info level logging
func Infof(format string, v ...interface{}) {
WithLevelf(LevelInfo, format, v...)
}
// Error provides warn level logging
func Error(v ...interface{}) {
WithLevel(LevelError, v...)
}
// Errorf provides warn level logging
func Errorf(format string, v ...interface{}) {
WithLevelf(LevelError, format, v...)
}
// Fatal logs with Log and then exits with os.Exit(1)
func Fatal(v ...interface{}) {
WithLevel(LevelFatal, v...)
os.Exit(1)
}
// Fatalf logs with Logf and then exits with os.Exit(1)
func Fatalf(format string, v ...interface{}) {
WithLevelf(LevelFatal, format, v...)
os.Exit(1)
}
// SetLevel sets the log level
func SetLevel(l Level) {
DefaultLevel = l
}
// GetLevel returns the current level
func GetLevel() Level {
return DefaultLevel
}
// Set a prefix for the logger
func SetPrefix(p string) {
prefix = p
}
// Set service name
func SetName(name string) {
prefix = fmt.Sprintf("[%s]", name)
} }

120
debug/log/logger.go Normal file
View File

@ -0,0 +1,120 @@
// Package log provides debug logging
package log
import (
"fmt"
"os"
)
func log(v ...interface{}) {
if len(prefix) > 0 {
DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)})
return
}
DefaultLog.Write(Record{Value: fmt.Sprint(v...)})
}
func logf(format string, v ...interface{}) {
if len(prefix) > 0 {
format = prefix + " " + format
}
DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)})
}
// WithLevel logs with the level specified
func WithLevel(l Level, v ...interface{}) {
if l > DefaultLevel {
return
}
log(v...)
}
// WithLevel logs with the level specified
func WithLevelf(l Level, format string, v ...interface{}) {
if l > DefaultLevel {
return
}
logf(format, v...)
}
// Trace provides trace level logging
func Trace(v ...interface{}) {
WithLevel(LevelTrace, v...)
}
// Tracef provides trace level logging
func Tracef(format string, v ...interface{}) {
WithLevelf(LevelTrace, format, v...)
}
// Debug provides debug level logging
func Debug(v ...interface{}) {
WithLevel(LevelDebug, v...)
}
// Debugf provides debug level logging
func Debugf(format string, v ...interface{}) {
WithLevelf(LevelDebug, format, v...)
}
// Warn provides warn level logging
func Warn(v ...interface{}) {
WithLevel(LevelWarn, v...)
}
// Warnf provides warn level logging
func Warnf(format string, v ...interface{}) {
WithLevelf(LevelWarn, format, v...)
}
// Info provides info level logging
func Info(v ...interface{}) {
WithLevel(LevelInfo, v...)
}
// Infof provides info level logging
func Infof(format string, v ...interface{}) {
WithLevelf(LevelInfo, format, v...)
}
// Error provides warn level logging
func Error(v ...interface{}) {
WithLevel(LevelError, v...)
}
// Errorf provides warn level logging
func Errorf(format string, v ...interface{}) {
WithLevelf(LevelError, format, v...)
}
// Fatal logs with Log and then exits with os.Exit(1)
func Fatal(v ...interface{}) {
WithLevel(LevelFatal, v...)
os.Exit(1)
}
// Fatalf logs with Logf and then exits with os.Exit(1)
func Fatalf(format string, v ...interface{}) {
WithLevelf(LevelFatal, format, v...)
os.Exit(1)
}
// SetLevel sets the log level
func SetLevel(l Level) {
DefaultLevel = l
}
// GetLevel returns the current level
func GetLevel() Level {
return DefaultLevel
}
// Set a prefix for the logger
func SetPrefix(p string) {
prefix = p
}
// Set service name
func SetName(name string) {
prefix = fmt.Sprintf("[%s]", name)
}

View File

@ -60,10 +60,3 @@ func Count(c int) ReadOption {
o.Count = c o.Count = c
} }
} }
// Stream requests continuous log stream
func Stream(s bool) ReadOption {
return func(o *ReadOptions) {
o.Stream = s
}
}

20
debug/log/stream.go Normal file
View File

@ -0,0 +1,20 @@
package log
type logStream struct {
stream <-chan Record
stop chan bool
}
func (l *logStream) Chan() <-chan Record {
return l.stream
}
func (l *logStream) Stop() error {
select {
case <-l.stop:
return nil
default:
close(l.stop)
}
return nil
}

View File

@ -71,10 +71,13 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
// the connection stays open until some timeout expires // the connection stays open until some timeout expires
// or something like that; that means the map of streams // or something like that; that means the map of streams
// might end up leaking memory if not cleaned up properly // might end up leaking memory if not cleaned up properly
records, stop := d.log.Stream() lgStream, err := d.log.Stream()
defer close(stop) if err != nil {
return err
}
defer lgStream.Stop()
for record := range records { for record := range lgStream.Chan() {
if err := d.sendRecord(record, stream); err != nil { if err := d.sendRecord(record, stream); err != nil {
return err return err
} }
@ -85,7 +88,10 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
} }
// get the log records // get the log records
records := d.log.Read(options...) records, err := d.log.Read(options...)
if err != nil {
return err
}
// send all the logs downstream // send all the logs downstream
for _, record := range records { for _, record := range records {

View File

@ -1,6 +1,8 @@
package service package service
import ( import (
"time"
"github.com/micro/go-micro/debug" "github.com/micro/go-micro/debug"
"github.com/micro/go-micro/debug/log" "github.com/micro/go-micro/debug/log"
) )
@ -10,50 +12,36 @@ type serviceLog struct {
} }
// Read reads log entries from the logger // Read reads log entries from the logger
func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record { func (s *serviceLog) Read(opts ...log.ReadOption) ([]log.Record, error) {
// TODO: parse opts var options log.ReadOptions
stream, err := s.Client.Log(opts...) for _, o := range opts {
if err != nil { o(&options)
return nil
} }
stream, err := s.Client.Log(options.Since, options.Count, false)
if err != nil {
return nil, err
}
defer stream.Stop()
// stream the records until nothing is left // stream the records until nothing is left
var records []log.Record var records []log.Record
for record := range stream {
for record := range stream.Chan() {
records = append(records, record) records = append(records, record)
} }
return records
return records, nil
} }
// There is no write support // There is no write support
func (s *serviceLog) Write(r log.Record) { func (s *serviceLog) Write(r log.Record) error {
return return nil
} }
// Stream log records // Stream log records
func (s *serviceLog) Stream() (<-chan log.Record, chan bool) { func (s *serviceLog) Stream() (log.Stream, error) {
stop := make(chan bool) return s.Client.Log(time.Time{}, 0, true)
stream, err := s.Client.Log(log.Stream(true))
if err != nil {
// return a closed stream
deadStream := make(chan log.Record)
close(deadStream)
return deadStream, stop
}
newStream := make(chan log.Record, 128)
go func() {
for {
select {
case rec := <-stream:
newStream <- rec
case <-stop:
return
}
}
}()
return newStream, stop
} }
// NewLog returns a new log interface // NewLog returns a new log interface

View File

@ -27,42 +27,40 @@ func NewClient(name string) *debugClient {
} }
} }
// Logs queries the service logs and returns a channel to read the logs from // Logs queries the services logs and returns a channel to read the logs from
func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) { func (d *debugClient) Log(since time.Time, count int, stream bool) (log.Stream, error) {
var options log.ReadOptions
// initialize the read options
for _, o := range opts {
o(&options)
}
req := &pb.LogRequest{} req := &pb.LogRequest{}
if !options.Since.IsZero() { if !since.IsZero() {
req.Since = options.Since.Unix() req.Since = since.Unix()
} }
if options.Count > 0 { if count > 0 {
req.Count = int64(options.Count) req.Count = int64(count)
} }
req.Stream = options.Stream // set whether to stream
req.Stream = stream
// get the log stream // get the log stream
stream, err := d.Client.Log(context.Background(), req) serverStream, err := d.Client.Log(context.Background(), req)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed getting log stream: %s", err) return nil, fmt.Errorf("failed getting log stream: %s", err)
} }
// log channel for streaming logs lg := &logStream{
logChan := make(chan log.Record) stream: make(chan log.Record),
stop: make(chan bool),
// go stream logs
go d.streamLogs(logChan, stream)
return logChan, nil
} }
func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { // go stream logs
go d.streamLogs(lg, serverStream)
return lg, nil
}
func (d *debugClient) streamLogs(lg *logStream, stream pb.Debug_LogService) {
defer stream.Close() defer stream.Close()
defer lg.Stop()
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()
@ -81,8 +79,10 @@ func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogSer
Metadata: metadata, Metadata: metadata,
} }
logChan <- record select {
case <-lg.stop:
return
case lg.stream <- record:
}
} }
close(logChan)
} }

25
debug/service/stream.go Normal file
View File

@ -0,0 +1,25 @@
package service
import (
"github.com/micro/go-micro/debug/log"
)
type logStream struct {
stream chan log.Record
stop chan bool
}
func (l *logStream) Chan() <-chan log.Record {
return l.stream
}
func (l *logStream) Stop() error {
select {
case <-l.stop:
return nil
default:
close(l.stream)
close(l.stop)
}
return nil
}