| @@ -33,13 +33,14 @@ func NewLog(opts ...Option) Log { | |||||||
| } | } | ||||||
|  |  | ||||||
| // Write writes logs into logger | // Write writes logs into logger | ||||||
| func (l *defaultLog) Write(r Record) { | func (l *defaultLog) Write(r Record) error { | ||||||
| 	golog.Print(r.Value) | 	golog.Print(r.Value) | ||||||
| 	l.Buffer.Put(fmt.Sprint(r.Value)) | 	l.Buffer.Put(fmt.Sprint(r.Value)) | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // Read reads logs and returns them | // Read reads logs and returns them | ||||||
| func (l *defaultLog) Read(opts ...ReadOption) []Record { | func (l *defaultLog) Read(opts ...ReadOption) ([]Record, error) { | ||||||
| 	options := ReadOptions{} | 	options := ReadOptions{} | ||||||
| 	// initialize the read options | 	// initialize the read options | ||||||
| 	for _, o := range opts { | 	for _, o := range opts { | ||||||
| @@ -78,12 +79,12 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { | |||||||
| 		records = append(records, record) | 		records = append(records, record) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return records | 	return records, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // Stream returns channel for reading log records | // Stream returns channel for reading log records | ||||||
| // along with a stop channel, close it when done | // along with a stop channel, close it when done | ||||||
| func (l *defaultLog) Stream() (<-chan Record, chan bool) { | func (l *defaultLog) Stream() (Stream, error) { | ||||||
| 	// get stream channel from ring buffer | 	// get stream channel from ring buffer | ||||||
| 	stream, stop := l.Buffer.Stream() | 	stream, stop := l.Buffer.Stream() | ||||||
| 	// make a buffered channel | 	// make a buffered channel | ||||||
| @@ -111,5 +112,8 @@ func (l *defaultLog) Stream() (<-chan Record, chan bool) { | |||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	return records, stop | 	return &logStream{ | ||||||
|  | 		stream: records, | ||||||
|  | 		stop:   stop, | ||||||
|  | 	}, nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -23,7 +23,7 @@ func TestLogger(t *testing.T) { | |||||||
|  |  | ||||||
| 	// Check if the logs are stored in the logger ring buffer | 	// Check if the logs are stored in the logger ring buffer | ||||||
| 	expected := []string{"foobar", "foo bar"} | 	expected := []string{"foobar", "foo bar"} | ||||||
| 	entries := DefaultLog.Read(Count(len(expected))) | 	entries, _ := DefaultLog.Read(Count(len(expected))) | ||||||
| 	for i, entry := range entries { | 	for i, entry := range entries { | ||||||
| 		if !reflect.DeepEqual(entry.Value, expected[i]) { | 		if !reflect.DeepEqual(entry.Value, expected[i]) { | ||||||
| 			t.Errorf("expected %s, got %s", expected[i], entry.Value) | 			t.Errorf("expected %s, got %s", expected[i], entry.Value) | ||||||
|   | |||||||
							
								
								
									
										35
									
								
								debug/log/level.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								debug/log/level.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | |||||||
|  | // Package log provides debug logging | ||||||
|  | package log | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"os" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // level is a log level | ||||||
|  | type Level int | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	LevelFatal Level = iota | ||||||
|  | 	LevelError | ||||||
|  | 	LevelInfo | ||||||
|  | 	LevelWarn | ||||||
|  | 	LevelDebug | ||||||
|  | 	LevelTrace | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	switch os.Getenv("MICRO_LOG_LEVEL") { | ||||||
|  | 	case "trace": | ||||||
|  | 		DefaultLevel = LevelTrace | ||||||
|  | 	case "debug": | ||||||
|  | 		DefaultLevel = LevelDebug | ||||||
|  | 	case "warn": | ||||||
|  | 		DefaultLevel = LevelWarn | ||||||
|  | 	case "info": | ||||||
|  | 		DefaultLevel = LevelInfo | ||||||
|  | 	case "error": | ||||||
|  | 		DefaultLevel = LevelError | ||||||
|  | 	case "fatal": | ||||||
|  | 		DefaultLevel = LevelFatal | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										151
									
								
								debug/log/log.go
									
									
									
									
									
								
							
							
						
						
									
										151
									
								
								debug/log/log.go
									
									
									
									
									
								
							| @@ -2,8 +2,6 @@ | |||||||
| package log | package log | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"fmt" |  | ||||||
| 	"os" |  | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -19,11 +17,11 @@ var ( | |||||||
| // Log is event log | // Log is event log | ||||||
| type Log interface { | type Log interface { | ||||||
| 	// Read reads log entries from the logger | 	// Read reads log entries from the logger | ||||||
| 	Read(...ReadOption) []Record | 	Read(...ReadOption) ([]Record, error) | ||||||
| 	// Write writes records to log | 	// Write writes records to log | ||||||
| 	Write(Record) | 	Write(Record) error | ||||||
| 	// Stream log records | 	// Stream log records | ||||||
| 	Stream() (<-chan Record, chan bool) | 	Stream() (Stream, error) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Record is log record entry | // Record is log record entry | ||||||
| @@ -36,144 +34,7 @@ type Record struct { | |||||||
| 	Metadata map[string]string | 	Metadata map[string]string | ||||||
| } | } | ||||||
|  |  | ||||||
| // level is a log level | type Stream interface { | ||||||
| type Level int | 	Chan() <-chan Record | ||||||
|  | 	Stop() error | ||||||
| const ( |  | ||||||
| 	LevelFatal Level = iota |  | ||||||
| 	LevelError |  | ||||||
| 	LevelInfo |  | ||||||
| 	LevelWarn |  | ||||||
| 	LevelDebug |  | ||||||
| 	LevelTrace |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| func init() { |  | ||||||
| 	switch os.Getenv("MICRO_LOG_LEVEL") { |  | ||||||
| 	case "trace": |  | ||||||
| 		DefaultLevel = LevelTrace |  | ||||||
| 	case "debug": |  | ||||||
| 		DefaultLevel = LevelDebug |  | ||||||
| 	case "warn": |  | ||||||
| 		DefaultLevel = LevelWarn |  | ||||||
| 	case "info": |  | ||||||
| 		DefaultLevel = LevelInfo |  | ||||||
| 	case "error": |  | ||||||
| 		DefaultLevel = LevelError |  | ||||||
| 	case "fatal": |  | ||||||
| 		DefaultLevel = LevelFatal |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func log(v ...interface{}) { |  | ||||||
| 	if len(prefix) > 0 { |  | ||||||
| 		DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)}) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	DefaultLog.Write(Record{Value: fmt.Sprint(v...)}) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func logf(format string, v ...interface{}) { |  | ||||||
| 	if len(prefix) > 0 { |  | ||||||
| 		format = prefix + " " + format |  | ||||||
| 	} |  | ||||||
| 	DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)}) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // WithLevel logs with the level specified |  | ||||||
| func WithLevel(l Level, v ...interface{}) { |  | ||||||
| 	if l > DefaultLevel { |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	log(v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // WithLevel logs with the level specified |  | ||||||
| func WithLevelf(l Level, format string, v ...interface{}) { |  | ||||||
| 	if l > DefaultLevel { |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	logf(format, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Trace provides trace level logging |  | ||||||
| func Trace(v ...interface{}) { |  | ||||||
| 	WithLevel(LevelTrace, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Tracef provides trace level logging |  | ||||||
| func Tracef(format string, v ...interface{}) { |  | ||||||
| 	WithLevelf(LevelTrace, format, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Debug provides debug level logging |  | ||||||
| func Debug(v ...interface{}) { |  | ||||||
| 	WithLevel(LevelDebug, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Debugf provides debug level logging |  | ||||||
| func Debugf(format string, v ...interface{}) { |  | ||||||
| 	WithLevelf(LevelDebug, format, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Warn provides warn level logging |  | ||||||
| func Warn(v ...interface{}) { |  | ||||||
| 	WithLevel(LevelWarn, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Warnf provides warn level logging |  | ||||||
| func Warnf(format string, v ...interface{}) { |  | ||||||
| 	WithLevelf(LevelWarn, format, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Info provides info level logging |  | ||||||
| func Info(v ...interface{}) { |  | ||||||
| 	WithLevel(LevelInfo, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Infof provides info level logging |  | ||||||
| func Infof(format string, v ...interface{}) { |  | ||||||
| 	WithLevelf(LevelInfo, format, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Error provides warn level logging |  | ||||||
| func Error(v ...interface{}) { |  | ||||||
| 	WithLevel(LevelError, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Errorf provides warn level logging |  | ||||||
| func Errorf(format string, v ...interface{}) { |  | ||||||
| 	WithLevelf(LevelError, format, v...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Fatal logs with Log and then exits with os.Exit(1) |  | ||||||
| func Fatal(v ...interface{}) { |  | ||||||
| 	WithLevel(LevelFatal, v...) |  | ||||||
| 	os.Exit(1) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Fatalf logs with Logf and then exits with os.Exit(1) |  | ||||||
| func Fatalf(format string, v ...interface{}) { |  | ||||||
| 	WithLevelf(LevelFatal, format, v...) |  | ||||||
| 	os.Exit(1) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // SetLevel sets the log level |  | ||||||
| func SetLevel(l Level) { |  | ||||||
| 	DefaultLevel = l |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetLevel returns the current level |  | ||||||
| func GetLevel() Level { |  | ||||||
| 	return DefaultLevel |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Set a prefix for the logger |  | ||||||
| func SetPrefix(p string) { |  | ||||||
| 	prefix = p |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Set service name |  | ||||||
| func SetName(name string) { |  | ||||||
| 	prefix = fmt.Sprintf("[%s]", name) |  | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										120
									
								
								debug/log/logger.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										120
									
								
								debug/log/logger.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,120 @@ | |||||||
|  | // Package log provides debug logging | ||||||
|  | package log | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"os" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func log(v ...interface{}) { | ||||||
|  | 	if len(prefix) > 0 { | ||||||
|  | 		DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)}) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	DefaultLog.Write(Record{Value: fmt.Sprint(v...)}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func logf(format string, v ...interface{}) { | ||||||
|  | 	if len(prefix) > 0 { | ||||||
|  | 		format = prefix + " " + format | ||||||
|  | 	} | ||||||
|  | 	DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // WithLevel logs with the level specified | ||||||
|  | func WithLevel(l Level, v ...interface{}) { | ||||||
|  | 	if l > DefaultLevel { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	log(v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // WithLevel logs with the level specified | ||||||
|  | func WithLevelf(l Level, format string, v ...interface{}) { | ||||||
|  | 	if l > DefaultLevel { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	logf(format, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Trace provides trace level logging | ||||||
|  | func Trace(v ...interface{}) { | ||||||
|  | 	WithLevel(LevelTrace, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Tracef provides trace level logging | ||||||
|  | func Tracef(format string, v ...interface{}) { | ||||||
|  | 	WithLevelf(LevelTrace, format, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Debug provides debug level logging | ||||||
|  | func Debug(v ...interface{}) { | ||||||
|  | 	WithLevel(LevelDebug, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Debugf provides debug level logging | ||||||
|  | func Debugf(format string, v ...interface{}) { | ||||||
|  | 	WithLevelf(LevelDebug, format, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Warn provides warn level logging | ||||||
|  | func Warn(v ...interface{}) { | ||||||
|  | 	WithLevel(LevelWarn, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Warnf provides warn level logging | ||||||
|  | func Warnf(format string, v ...interface{}) { | ||||||
|  | 	WithLevelf(LevelWarn, format, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Info provides info level logging | ||||||
|  | func Info(v ...interface{}) { | ||||||
|  | 	WithLevel(LevelInfo, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Infof provides info level logging | ||||||
|  | func Infof(format string, v ...interface{}) { | ||||||
|  | 	WithLevelf(LevelInfo, format, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Error provides warn level logging | ||||||
|  | func Error(v ...interface{}) { | ||||||
|  | 	WithLevel(LevelError, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Errorf provides warn level logging | ||||||
|  | func Errorf(format string, v ...interface{}) { | ||||||
|  | 	WithLevelf(LevelError, format, v...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Fatal logs with Log and then exits with os.Exit(1) | ||||||
|  | func Fatal(v ...interface{}) { | ||||||
|  | 	WithLevel(LevelFatal, v...) | ||||||
|  | 	os.Exit(1) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Fatalf logs with Logf and then exits with os.Exit(1) | ||||||
|  | func Fatalf(format string, v ...interface{}) { | ||||||
|  | 	WithLevelf(LevelFatal, format, v...) | ||||||
|  | 	os.Exit(1) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // SetLevel sets the log level | ||||||
|  | func SetLevel(l Level) { | ||||||
|  | 	DefaultLevel = l | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // GetLevel returns the current level | ||||||
|  | func GetLevel() Level { | ||||||
|  | 	return DefaultLevel | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Set a prefix for the logger | ||||||
|  | func SetPrefix(p string) { | ||||||
|  | 	prefix = p | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Set service name | ||||||
|  | func SetName(name string) { | ||||||
|  | 	prefix = fmt.Sprintf("[%s]", name) | ||||||
|  | } | ||||||
| @@ -60,10 +60,3 @@ func Count(c int) ReadOption { | |||||||
| 		o.Count = c | 		o.Count = c | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // Stream requests continuous log stream |  | ||||||
| func Stream(s bool) ReadOption { |  | ||||||
| 	return func(o *ReadOptions) { |  | ||||||
| 		o.Stream = s |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|   | |||||||
							
								
								
									
										20
									
								
								debug/log/stream.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								debug/log/stream.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | |||||||
|  | package log | ||||||
|  |  | ||||||
|  | type logStream struct { | ||||||
|  | 	stream <-chan Record | ||||||
|  | 	stop   chan bool | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (l *logStream) Chan() <-chan Record { | ||||||
|  | 	return l.stream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (l *logStream) Stop() error { | ||||||
|  | 	select { | ||||||
|  | 	case <-l.stop: | ||||||
|  | 		return nil | ||||||
|  | 	default: | ||||||
|  | 		close(l.stop) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
| @@ -71,10 +71,13 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error { | |||||||
| 		// the connection stays open until some timeout expires | 		// the connection stays open until some timeout expires | ||||||
| 		// or something like that; that means the map of streams | 		// or something like that; that means the map of streams | ||||||
| 		// might end up leaking memory if not cleaned up properly | 		// might end up leaking memory if not cleaned up properly | ||||||
| 		records, stop := d.log.Stream() | 		lgStream, err := d.log.Stream() | ||||||
| 		defer close(stop) | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		defer lgStream.Stop() | ||||||
|  |  | ||||||
| 		for record := range records { | 		for record := range lgStream.Chan() { | ||||||
| 			if err := d.sendRecord(record, stream); err != nil { | 			if err := d.sendRecord(record, stream); err != nil { | ||||||
| 				return err | 				return err | ||||||
| 			} | 			} | ||||||
| @@ -85,7 +88,10 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// get the log records | 	// get the log records | ||||||
| 	records := d.log.Read(options...) | 	records, err := d.log.Read(options...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// send all the logs downstream | 	// send all the logs downstream | ||||||
| 	for _, record := range records { | 	for _, record := range records { | ||||||
|   | |||||||
| @@ -1,6 +1,8 @@ | |||||||
| package service | package service | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/debug" | 	"github.com/micro/go-micro/debug" | ||||||
| 	"github.com/micro/go-micro/debug/log" | 	"github.com/micro/go-micro/debug/log" | ||||||
| ) | ) | ||||||
| @@ -10,50 +12,36 @@ type serviceLog struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| // Read reads log entries from the logger | // Read reads log entries from the logger | ||||||
| func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record { | func (s *serviceLog) Read(opts ...log.ReadOption) ([]log.Record, error) { | ||||||
| 	// TODO: parse opts | 	var options log.ReadOptions | ||||||
| 	stream, err := s.Client.Log(opts...) | 	for _, o := range opts { | ||||||
| 	if err != nil { | 		o(&options) | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	stream, err := s.Client.Log(options.Since, options.Count, false) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	defer stream.Stop() | ||||||
|  |  | ||||||
| 	// stream the records until nothing is left | 	// stream the records until nothing is left | ||||||
| 	var records []log.Record | 	var records []log.Record | ||||||
| 	for record := range stream { |  | ||||||
|  | 	for record := range stream.Chan() { | ||||||
| 		records = append(records, record) | 		records = append(records, record) | ||||||
| 	} | 	} | ||||||
| 	return records |  | ||||||
|  | 	return records, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // There is no write support | // There is no write support | ||||||
| func (s *serviceLog) Write(r log.Record) { | func (s *serviceLog) Write(r log.Record) error { | ||||||
| 	return | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // Stream log records | // Stream log records | ||||||
| func (s *serviceLog) Stream() (<-chan log.Record, chan bool) { | func (s *serviceLog) Stream() (log.Stream, error) { | ||||||
| 	stop := make(chan bool) | 	return s.Client.Log(time.Time{}, 0, true) | ||||||
| 	stream, err := s.Client.Log(log.Stream(true)) |  | ||||||
| 	if err != nil { |  | ||||||
| 		// return a closed stream |  | ||||||
| 		deadStream := make(chan log.Record) |  | ||||||
| 		close(deadStream) |  | ||||||
| 		return deadStream, stop |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	newStream := make(chan log.Record, 128) |  | ||||||
|  |  | ||||||
| 	go func() { |  | ||||||
| 		for { |  | ||||||
| 			select { |  | ||||||
| 			case rec := <-stream: |  | ||||||
| 				newStream <- rec |  | ||||||
| 			case <-stop: |  | ||||||
| 				return |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	return newStream, stop |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewLog returns a new log interface | // NewLog returns a new log interface | ||||||
|   | |||||||
| @@ -27,42 +27,40 @@ func NewClient(name string) *debugClient { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // Logs queries the service logs and returns a channel to read the logs from | // Logs queries the services logs and returns a channel to read the logs from | ||||||
| func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) { | func (d *debugClient) Log(since time.Time, count int, stream bool) (log.Stream, error) { | ||||||
| 	var options log.ReadOptions |  | ||||||
| 	// initialize the read options |  | ||||||
| 	for _, o := range opts { |  | ||||||
| 		o(&options) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	req := &pb.LogRequest{} | 	req := &pb.LogRequest{} | ||||||
| 	if !options.Since.IsZero() { | 	if !since.IsZero() { | ||||||
| 		req.Since = options.Since.Unix() | 		req.Since = since.Unix() | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if options.Count > 0 { | 	if count > 0 { | ||||||
| 		req.Count = int64(options.Count) | 		req.Count = int64(count) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	req.Stream = options.Stream | 	// set whether to stream | ||||||
|  | 	req.Stream = stream | ||||||
|  |  | ||||||
| 	// get the log stream | 	// get the log stream | ||||||
| 	stream, err := d.Client.Log(context.Background(), req) | 	serverStream, err := d.Client.Log(context.Background(), req) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, fmt.Errorf("failed getting log stream: %s", err) | 		return nil, fmt.Errorf("failed getting log stream: %s", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// log channel for streaming logs | 	lg := &logStream{ | ||||||
| 	logChan := make(chan log.Record) | 		stream: make(chan log.Record), | ||||||
|  | 		stop:   make(chan bool), | ||||||
| 	// go stream logs |  | ||||||
| 	go d.streamLogs(logChan, stream) |  | ||||||
|  |  | ||||||
| 	return logChan, nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { | 	// go stream logs | ||||||
|  | 	go d.streamLogs(lg, serverStream) | ||||||
|  |  | ||||||
|  | 	return lg, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (d *debugClient) streamLogs(lg *logStream, stream pb.Debug_LogService) { | ||||||
| 	defer stream.Close() | 	defer stream.Close() | ||||||
|  | 	defer lg.Stop() | ||||||
|  |  | ||||||
| 	for { | 	for { | ||||||
| 		resp, err := stream.Recv() | 		resp, err := stream.Recv() | ||||||
| @@ -81,8 +79,10 @@ func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogSer | |||||||
| 			Metadata:  metadata, | 			Metadata:  metadata, | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		logChan <- record | 		select { | ||||||
|  | 		case <-lg.stop: | ||||||
|  | 			return | ||||||
|  | 		case lg.stream <- record: | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	close(logChan) |  | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										25
									
								
								debug/service/stream.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								debug/service/stream.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,25 @@ | |||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"github.com/micro/go-micro/debug/log" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type logStream struct { | ||||||
|  | 	stream chan log.Record | ||||||
|  | 	stop   chan bool | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (l *logStream) Chan() <-chan log.Record { | ||||||
|  | 	return l.stream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (l *logStream) Stop() error { | ||||||
|  | 	select { | ||||||
|  | 	case <-l.stop: | ||||||
|  | 		return nil | ||||||
|  | 	default: | ||||||
|  | 		close(l.stream) | ||||||
|  | 		close(l.stop) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user