Merge branch 'master' of https://github.com/micro/go-micro into kubernetes-logging
This commit is contained in:
commit
b7ac62f7d2
@ -33,13 +33,14 @@ func NewLog(opts ...Option) Log {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write writes logs into logger
|
// Write writes logs into logger
|
||||||
func (l *defaultLog) Write(r Record) {
|
func (l *defaultLog) Write(r Record) error {
|
||||||
golog.Print(r.Value)
|
golog.Print(r.Value)
|
||||||
l.Buffer.Put(fmt.Sprint(r.Value))
|
l.Buffer.Put(fmt.Sprint(r.Value))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read reads logs and returns them
|
// Read reads logs and returns them
|
||||||
func (l *defaultLog) Read(opts ...ReadOption) []Record {
|
func (l *defaultLog) Read(opts ...ReadOption) ([]Record, error) {
|
||||||
options := ReadOptions{}
|
options := ReadOptions{}
|
||||||
// initialize the read options
|
// initialize the read options
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
@ -78,12 +79,12 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
|
|||||||
records = append(records, record)
|
records = append(records, record)
|
||||||
}
|
}
|
||||||
|
|
||||||
return records
|
return records, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream returns channel for reading log records
|
// Stream returns channel for reading log records
|
||||||
// along with a stop channel, close it when done
|
// along with a stop channel, close it when done
|
||||||
func (l *defaultLog) Stream() (<-chan Record, chan bool) {
|
func (l *defaultLog) Stream() (Stream, error) {
|
||||||
// get stream channel from ring buffer
|
// get stream channel from ring buffer
|
||||||
stream, stop := l.Buffer.Stream()
|
stream, stop := l.Buffer.Stream()
|
||||||
// make a buffered channel
|
// make a buffered channel
|
||||||
@ -111,5 +112,8 @@ func (l *defaultLog) Stream() (<-chan Record, chan bool) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return records, stop
|
return &logStream{
|
||||||
|
stream: records,
|
||||||
|
stop: stop,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ func TestLogger(t *testing.T) {
|
|||||||
|
|
||||||
// Check if the logs are stored in the logger ring buffer
|
// Check if the logs are stored in the logger ring buffer
|
||||||
expected := []string{"foobar", "foo bar"}
|
expected := []string{"foobar", "foo bar"}
|
||||||
entries := DefaultLog.Read(Count(len(expected)))
|
entries, _ := DefaultLog.Read(Count(len(expected)))
|
||||||
for i, entry := range entries {
|
for i, entry := range entries {
|
||||||
if !reflect.DeepEqual(entry.Value, expected[i]) {
|
if !reflect.DeepEqual(entry.Value, expected[i]) {
|
||||||
t.Errorf("expected %s, got %s", expected[i], entry.Value)
|
t.Errorf("expected %s, got %s", expected[i], entry.Value)
|
||||||
|
35
debug/log/level.go
Normal file
35
debug/log/level.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
// Package log provides debug logging
|
||||||
|
package log
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
// level is a log level
|
||||||
|
type Level int
|
||||||
|
|
||||||
|
const (
|
||||||
|
LevelFatal Level = iota
|
||||||
|
LevelError
|
||||||
|
LevelInfo
|
||||||
|
LevelWarn
|
||||||
|
LevelDebug
|
||||||
|
LevelTrace
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
switch os.Getenv("MICRO_LOG_LEVEL") {
|
||||||
|
case "trace":
|
||||||
|
DefaultLevel = LevelTrace
|
||||||
|
case "debug":
|
||||||
|
DefaultLevel = LevelDebug
|
||||||
|
case "warn":
|
||||||
|
DefaultLevel = LevelWarn
|
||||||
|
case "info":
|
||||||
|
DefaultLevel = LevelInfo
|
||||||
|
case "error":
|
||||||
|
DefaultLevel = LevelError
|
||||||
|
case "fatal":
|
||||||
|
DefaultLevel = LevelFatal
|
||||||
|
}
|
||||||
|
}
|
@ -2,8 +2,6 @@
|
|||||||
package log
|
package log
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,11 +17,11 @@ var (
|
|||||||
// Log is event log
|
// Log is event log
|
||||||
type Log interface {
|
type Log interface {
|
||||||
// Read reads log entries from the logger
|
// Read reads log entries from the logger
|
||||||
Read(...ReadOption) []Record
|
Read(...ReadOption) ([]Record, error)
|
||||||
// Write writes records to log
|
// Write writes records to log
|
||||||
Write(Record)
|
Write(Record) error
|
||||||
// Stream log records
|
// Stream log records
|
||||||
Stream() (<-chan Record, chan bool)
|
Stream() (Stream, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record is log record entry
|
// Record is log record entry
|
||||||
@ -36,6 +34,7 @@ type Record struct {
|
|||||||
Metadata map[string]string `json:"metadata"`
|
Metadata map[string]string `json:"metadata"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
// Level is a log level
|
// Level is a log level
|
||||||
type Level int
|
type Level int
|
||||||
|
|
||||||
@ -176,4 +175,9 @@ func SetPrefix(p string) {
|
|||||||
// Set service name
|
// Set service name
|
||||||
func SetName(name string) {
|
func SetName(name string) {
|
||||||
prefix = fmt.Sprintf("[%s]", name)
|
prefix = fmt.Sprintf("[%s]", name)
|
||||||
|
=======
|
||||||
|
type Stream interface {
|
||||||
|
Chan() <-chan Record
|
||||||
|
Stop() error
|
||||||
|
>>>>>>> 50d5c6402b1e2bea64476a969b613b7c685d6f8e
|
||||||
}
|
}
|
||||||
|
120
debug/log/logger.go
Normal file
120
debug/log/logger.go
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
// Package log provides debug logging
|
||||||
|
package log
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
func log(v ...interface{}) {
|
||||||
|
if len(prefix) > 0 {
|
||||||
|
DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
DefaultLog.Write(Record{Value: fmt.Sprint(v...)})
|
||||||
|
}
|
||||||
|
|
||||||
|
func logf(format string, v ...interface{}) {
|
||||||
|
if len(prefix) > 0 {
|
||||||
|
format = prefix + " " + format
|
||||||
|
}
|
||||||
|
DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLevel logs with the level specified
|
||||||
|
func WithLevel(l Level, v ...interface{}) {
|
||||||
|
if l > DefaultLevel {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log(v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLevel logs with the level specified
|
||||||
|
func WithLevelf(l Level, format string, v ...interface{}) {
|
||||||
|
if l > DefaultLevel {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logf(format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trace provides trace level logging
|
||||||
|
func Trace(v ...interface{}) {
|
||||||
|
WithLevel(LevelTrace, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tracef provides trace level logging
|
||||||
|
func Tracef(format string, v ...interface{}) {
|
||||||
|
WithLevelf(LevelTrace, format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debug provides debug level logging
|
||||||
|
func Debug(v ...interface{}) {
|
||||||
|
WithLevel(LevelDebug, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debugf provides debug level logging
|
||||||
|
func Debugf(format string, v ...interface{}) {
|
||||||
|
WithLevelf(LevelDebug, format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warn provides warn level logging
|
||||||
|
func Warn(v ...interface{}) {
|
||||||
|
WithLevel(LevelWarn, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warnf provides warn level logging
|
||||||
|
func Warnf(format string, v ...interface{}) {
|
||||||
|
WithLevelf(LevelWarn, format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Info provides info level logging
|
||||||
|
func Info(v ...interface{}) {
|
||||||
|
WithLevel(LevelInfo, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Infof provides info level logging
|
||||||
|
func Infof(format string, v ...interface{}) {
|
||||||
|
WithLevelf(LevelInfo, format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error provides warn level logging
|
||||||
|
func Error(v ...interface{}) {
|
||||||
|
WithLevel(LevelError, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Errorf provides warn level logging
|
||||||
|
func Errorf(format string, v ...interface{}) {
|
||||||
|
WithLevelf(LevelError, format, v...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fatal logs with Log and then exits with os.Exit(1)
|
||||||
|
func Fatal(v ...interface{}) {
|
||||||
|
WithLevel(LevelFatal, v...)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fatalf logs with Logf and then exits with os.Exit(1)
|
||||||
|
func Fatalf(format string, v ...interface{}) {
|
||||||
|
WithLevelf(LevelFatal, format, v...)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetLevel sets the log level
|
||||||
|
func SetLevel(l Level) {
|
||||||
|
DefaultLevel = l
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLevel returns the current level
|
||||||
|
func GetLevel() Level {
|
||||||
|
return DefaultLevel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set a prefix for the logger
|
||||||
|
func SetPrefix(p string) {
|
||||||
|
prefix = p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set service name
|
||||||
|
func SetName(name string) {
|
||||||
|
prefix = fmt.Sprintf("[%s]", name)
|
||||||
|
}
|
@ -60,10 +60,3 @@ func Count(c int) ReadOption {
|
|||||||
o.Count = c
|
o.Count = c
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream requests continuous log stream
|
|
||||||
func Stream(s bool) ReadOption {
|
|
||||||
return func(o *ReadOptions) {
|
|
||||||
o.Stream = s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
20
debug/log/stream.go
Normal file
20
debug/log/stream.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package log
|
||||||
|
|
||||||
|
type logStream struct {
|
||||||
|
stream <-chan Record
|
||||||
|
stop chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logStream) Chan() <-chan Record {
|
||||||
|
return l.stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logStream) Stop() error {
|
||||||
|
select {
|
||||||
|
case <-l.stop:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
close(l.stop)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -71,10 +71,13 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
|
|||||||
// the connection stays open until some timeout expires
|
// the connection stays open until some timeout expires
|
||||||
// or something like that; that means the map of streams
|
// or something like that; that means the map of streams
|
||||||
// might end up leaking memory if not cleaned up properly
|
// might end up leaking memory if not cleaned up properly
|
||||||
records, stop := d.log.Stream()
|
lgStream, err := d.log.Stream()
|
||||||
defer close(stop)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer lgStream.Stop()
|
||||||
|
|
||||||
for record := range records {
|
for record := range lgStream.Chan() {
|
||||||
if err := d.sendRecord(record, stream); err != nil {
|
if err := d.sendRecord(record, stream); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -85,7 +88,10 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// get the log records
|
// get the log records
|
||||||
records := d.log.Read(options...)
|
records, err := d.log.Read(options...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// send all the logs downstream
|
// send all the logs downstream
|
||||||
for _, record := range records {
|
for _, record := range records {
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-micro/debug"
|
"github.com/micro/go-micro/debug"
|
||||||
"github.com/micro/go-micro/debug/log"
|
"github.com/micro/go-micro/debug/log"
|
||||||
)
|
)
|
||||||
@ -10,50 +12,36 @@ type serviceLog struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Read reads log entries from the logger
|
// Read reads log entries from the logger
|
||||||
func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record {
|
func (s *serviceLog) Read(opts ...log.ReadOption) ([]log.Record, error) {
|
||||||
// TODO: parse opts
|
var options log.ReadOptions
|
||||||
stream, err := s.Client.Log(opts...)
|
for _, o := range opts {
|
||||||
if err != nil {
|
o(&options)
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream, err := s.Client.Log(options.Since, options.Count, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer stream.Stop()
|
||||||
|
|
||||||
// stream the records until nothing is left
|
// stream the records until nothing is left
|
||||||
var records []log.Record
|
var records []log.Record
|
||||||
for record := range stream {
|
|
||||||
|
for record := range stream.Chan() {
|
||||||
records = append(records, record)
|
records = append(records, record)
|
||||||
}
|
}
|
||||||
return records
|
|
||||||
|
return records, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// There is no write support
|
// There is no write support
|
||||||
func (s *serviceLog) Write(r log.Record) {
|
func (s *serviceLog) Write(r log.Record) error {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream log records
|
// Stream log records
|
||||||
func (s *serviceLog) Stream() (<-chan log.Record, chan bool) {
|
func (s *serviceLog) Stream() (log.Stream, error) {
|
||||||
stop := make(chan bool)
|
return s.Client.Log(time.Time{}, 0, true)
|
||||||
stream, err := s.Client.Log(log.Stream(true))
|
|
||||||
if err != nil {
|
|
||||||
// return a closed stream
|
|
||||||
deadStream := make(chan log.Record)
|
|
||||||
close(deadStream)
|
|
||||||
return deadStream, stop
|
|
||||||
}
|
|
||||||
|
|
||||||
newStream := make(chan log.Record, 128)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case rec := <-stream:
|
|
||||||
newStream <- rec
|
|
||||||
case <-stop:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return newStream, stop
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLog returns a new log interface
|
// NewLog returns a new log interface
|
||||||
|
@ -27,42 +27,40 @@ func NewClient(name string) *debugClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logs queries the service logs and returns a channel to read the logs from
|
// Logs queries the services logs and returns a channel to read the logs from
|
||||||
func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
|
func (d *debugClient) Log(since time.Time, count int, stream bool) (log.Stream, error) {
|
||||||
var options log.ReadOptions
|
|
||||||
// initialize the read options
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
|
|
||||||
req := &pb.LogRequest{}
|
req := &pb.LogRequest{}
|
||||||
if !options.Since.IsZero() {
|
if !since.IsZero() {
|
||||||
req.Since = options.Since.Unix()
|
req.Since = since.Unix()
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Count > 0 {
|
if count > 0 {
|
||||||
req.Count = int64(options.Count)
|
req.Count = int64(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Stream = options.Stream
|
// set whether to stream
|
||||||
|
req.Stream = stream
|
||||||
|
|
||||||
// get the log stream
|
// get the log stream
|
||||||
stream, err := d.Client.Log(context.Background(), req)
|
serverStream, err := d.Client.Log(context.Background(), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed getting log stream: %s", err)
|
return nil, fmt.Errorf("failed getting log stream: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// log channel for streaming logs
|
lg := &logStream{
|
||||||
logChan := make(chan log.Record)
|
stream: make(chan log.Record),
|
||||||
|
stop: make(chan bool),
|
||||||
|
}
|
||||||
|
|
||||||
// go stream logs
|
// go stream logs
|
||||||
go d.streamLogs(logChan, stream)
|
go d.streamLogs(lg, serverStream)
|
||||||
|
|
||||||
return logChan, nil
|
return lg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
|
func (d *debugClient) streamLogs(lg *logStream, stream pb.Debug_LogService) {
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
defer lg.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
resp, err := stream.Recv()
|
resp, err := stream.Recv()
|
||||||
@ -81,8 +79,10 @@ func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogSer
|
|||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
}
|
}
|
||||||
|
|
||||||
logChan <- record
|
select {
|
||||||
|
case <-lg.stop:
|
||||||
|
return
|
||||||
|
case lg.stream <- record:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(logChan)
|
|
||||||
}
|
}
|
||||||
|
25
debug/service/stream.go
Normal file
25
debug/service/stream.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/micro/go-micro/debug/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type logStream struct {
|
||||||
|
stream chan log.Record
|
||||||
|
stop chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logStream) Chan() <-chan log.Record {
|
||||||
|
return l.stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logStream) Stop() error {
|
||||||
|
select {
|
||||||
|
case <-l.stop:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
close(l.stream)
|
||||||
|
close(l.stop)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user