Merge branch 'master' of https://github.com/micro/go-micro into kubernetes-logging

This commit is contained in:
Jake Sanders 2019-12-17 16:13:36 +00:00
commit 34b1c403bb
9 changed files with 156 additions and 64 deletions

View File

@ -4,17 +4,17 @@ import (
"fmt" "fmt"
golog "log" golog "log"
"github.com/micro/go-micro/debug/buffer" "github.com/micro/go-micro/util/ring"
) )
var ( var (
// DefaultSize of the logger buffer // DefaultSize of the logger buffer
DefaultSize = 1000 DefaultSize = 1024
) )
// defaultLog is default micro log // defaultLog is default micro log
type defaultLog struct { type defaultLog struct {
*buffer.Buffer *ring.Buffer
} }
// NewLog returns default Logger with // NewLog returns default Logger with
@ -28,7 +28,7 @@ func NewLog(opts ...Option) Log {
} }
return &defaultLog{ return &defaultLog{
Buffer: buffer.New(options.Size), Buffer: ring.New(options.Size),
} }
} }
@ -46,7 +46,7 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
o(&options) o(&options)
} }
var entries []*buffer.Entry var entries []*ring.Entry
// if Since options ha sbeen specified we honor it // if Since options ha sbeen specified we honor it
if !options.Since.IsZero() { if !options.Since.IsZero() {
entries = l.Buffer.Since(options.Since) entries = l.Buffer.Since(options.Since)
@ -82,9 +82,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
} }
// Stream returns channel for reading log records // Stream returns channel for reading log records
func (l *defaultLog) Stream(stop chan bool) <-chan Record { // along with a stop channel, close it when done
func (l *defaultLog) Stream() (<-chan Record, chan bool) {
// get stream channel from ring buffer // get stream channel from ring buffer
stream := l.Buffer.Stream(stop) stream, stop := l.Buffer.Stream()
// make a buffered channel // make a buffered channel
records := make(chan Record, 128) records := make(chan Record, 128)
// get last 10 records // get last 10 records
@ -110,5 +111,5 @@ func (l *defaultLog) Stream(stop chan bool) <-chan Record {
} }
}() }()
return records return records, stop
} }

View File

@ -23,7 +23,7 @@ type Log interface {
// Write writes records to log // Write writes records to log
Write(Record) Write(Record)
// Stream log records // Stream log records
Stream(chan bool) <-chan Record Stream() (<-chan Record, chan bool)
} }
// Record is log record entry // Record is log record entry
@ -174,6 +174,6 @@ func SetPrefix(p string) {
} }
// Set service name // Set service name
func Name(name string) { func SetName(name string) {
prefix = fmt.Sprintf("[%s]", name) prefix = fmt.Sprintf("[%s]", name)
} }

View File

@ -7,10 +7,19 @@ type Option func(*Options)
// Options are logger options // Options are logger options
type Options struct { type Options struct {
// Name of the log
Name string
// Size is the size of ring buffer // Size is the size of ring buffer
Size int Size int
} }
// Name of the log
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
// Size sets the size of the ring buffer // Size sets the size of the ring buffer
func Size(s int) Option { func Size(s int) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -1,4 +1,4 @@
// Pacjage handler implements service debug handler // Package handler implements service debug handler embedded in go-micro services
package handler package handler
import ( import (
@ -66,26 +66,27 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
} }
if req.Stream { if req.Stream {
stop := make(chan bool) // TODO: we need to figure out how to close the log stream
defer close(stop)
// TODO: we need to figure out how to close ithe log stream
// It seems like when a client disconnects, // It seems like when a client disconnects,
// the connection stays open until some timeout expires // the connection stays open until some timeout expires
// or something like that; that means the map of streams // or something like that; that means the map of streams
// might end up leaking memory if not cleaned up properly // might end up leaking memory if not cleaned up properly
records := d.log.Stream(stop) records, stop := d.log.Stream()
defer close(stop)
for record := range records { for record := range records {
if err := d.sendRecord(record, stream); err != nil { if err := d.sendRecord(record, stream); err != nil {
return err return err
} }
} }
// done streaming, return // done streaming, return
return nil return nil
} }
// get the log records // get the log records
records := d.log.Read(options...) records := d.log.Read(options...)
// send all the logs downstream // send all the logs downstream
for _, record := range records { for _, record := range records {
if err := d.sendRecord(record, stream); err != nil { if err := d.sendRecord(record, stream); err != nil {
@ -102,15 +103,9 @@ func (d *Debug) sendRecord(record log.Record, stream server.Stream) error {
metadata[k] = v metadata[k] = v
} }
pbRecord := &proto.Record{ return stream.Send(&proto.Record{
Timestamp: record.Timestamp.Unix(), Timestamp: record.Timestamp.Unix(),
Value: record.Value.(string), Value: record.Value.(string),
Metadata: metadata, Metadata: metadata,
} })
if err := stream.Send(pbRecord); err != nil {
return err
}
return nil
} }

76
debug/service/log.go Normal file
View File

@ -0,0 +1,76 @@
package service
import (
"github.com/micro/go-micro/debug"
"github.com/micro/go-micro/debug/log"
)
type serviceLog struct {
Client *debugClient
}
// Read reads log entries from the logger
func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record {
// TODO: parse opts
stream, err := s.Client.Log(opts...)
if err != nil {
return nil
}
// stream the records until nothing is left
var records []log.Record
for record := range stream {
records = append(records, record)
}
return records
}
// There is no write support
func (s *serviceLog) Write(r log.Record) {
return
}
// Stream log records
func (s *serviceLog) Stream() (<-chan log.Record, chan bool) {
stop := make(chan bool)
stream, err := s.Client.Log(log.Stream(true))
if err != nil {
// return a closed stream
deadStream := make(chan log.Record)
close(deadStream)
return deadStream, stop
}
newStream := make(chan log.Record, 128)
go func() {
for {
select {
case rec := <-stream:
newStream <- rec
case <-stop:
return
}
}
}()
return newStream, stop
}
// NewLog returns a new log interface
func NewLog(opts ...log.Option) log.Log {
var options log.Options
for _, o := range opts {
o(&options)
}
name := options.Name
// set the default name
if len(name) == 0 {
name = debug.DefaultName
}
return &serviceLog{
Client: NewClient(name),
}
}

View File

@ -1,3 +1,4 @@
// Package service provides the service log
package service package service
import ( import (
@ -12,23 +13,23 @@ import (
) )
// Debug provides debug service client // Debug provides debug service client
type Debug struct { type debugClient struct {
dbg pb.DebugService Client pb.DebugService
} }
// NewDebug provides Debug service implementation // NewClient provides a debug client
func NewDebug(name string) *Debug { func NewClient(name string) *debugClient {
// create default client // create default client
cli := client.DefaultClient cli := client.DefaultClient
return &Debug{ return &debugClient{
dbg: pb.NewDebugService(name, cli), Client: pb.NewDebugService(name, cli),
} }
} }
// Logs queries the service logs and returns a channel to read the logs from // Logs queries the service logs and returns a channel to read the logs from
func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) { func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
options := log.ReadOptions{} var options log.ReadOptions
// initialize the read options // initialize the read options
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
@ -46,20 +47,21 @@ func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
req.Stream = options.Stream req.Stream = options.Stream
// get the log stream // get the log stream
stream, err := d.dbg.Log(context.Background(), req) stream, err := d.Client.Log(context.Background(), req)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed getting log stream: %s", err) return nil, fmt.Errorf("failed getting log stream: %s", err)
} }
// log channel for streaming logs // log channel for streaming logs
logChan := make(chan log.Record) logChan := make(chan log.Record)
// go stream logs // go stream logs
go d.streamLogs(logChan, stream) go d.streamLogs(logChan, stream)
return logChan, nil return logChan, nil
} }
func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
defer stream.Close() defer stream.Close()
for { for {

View File

@ -1,11 +1,11 @@
package stats package stats
import ( import (
"github.com/micro/go-micro/debug/buffer" "github.com/micro/go-micro/util/ring"
) )
type stats struct { type stats struct {
buffer *buffer.Buffer buffer *ring.Buffer
} }
func (s *stats) Read() ([]*Stat, error) { func (s *stats) Read() ([]*Stat, error) {
@ -33,6 +33,6 @@ func (s *stats) Write(stat *Stat) error {
// TODO add options // TODO add options
func NewStats() Stats { func NewStats() Stats {
return &stats{ return &stats{
buffer: buffer.New(1024), buffer: ring.New(1024),
} }
} }

View File

@ -1,5 +1,5 @@
// Package buffer provides a simple ring buffer for storing local data // Package ring provides a simple ring buffer for storing local data
package buffer package ring
import ( import (
"sync" "sync"
@ -8,18 +8,13 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
type stream struct {
id string
entries chan *Entry
stop chan bool
}
// Buffer is ring buffer // Buffer is ring buffer
type Buffer struct { type Buffer struct {
size int size int
sync.RWMutex sync.RWMutex
vals []*Entry vals []*Entry
streams map[string]stream streams map[string]*Stream
} }
// Entry is ring buffer data entry // Entry is ring buffer data entry
@ -28,12 +23,14 @@ type Entry struct {
Timestamp time.Time Timestamp time.Time
} }
// New returns a new buffer of the given size // Stream is used to stream the buffer
func New(i int) *Buffer { type Stream struct {
return &Buffer{ // Id of the stream
size: i, Id string
streams: make(map[string]stream), // Buffered entries
} Entries chan *Entry
// Stop channel
Stop chan bool
} }
// Put adds a new value to ring buffer // Put adds a new value to ring buffer
@ -53,13 +50,13 @@ func (b *Buffer) Put(v interface{}) {
b.vals = b.vals[1:] b.vals = b.vals[1:]
} }
// TODO: this is fucking ugly // send to every stream
for _, stream := range b.streams { for _, stream := range b.streams {
select { select {
case <-stream.stop: case <-stream.Stop:
delete(b.streams, stream.id) delete(b.streams, stream.Id)
close(stream.entries) close(stream.Entries)
case stream.entries <- entry: case stream.Entries <- entry:
} }
} }
} }
@ -115,22 +112,34 @@ func (b *Buffer) Since(t time.Time) []*Entry {
} }
// Stream logs from the buffer // Stream logs from the buffer
func (b *Buffer) Stream(stop chan bool) <-chan *Entry { // Close the channel when you want to stop
func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
entries := make(chan *Entry, 128) entries := make(chan *Entry, 128)
id := uuid.New().String() id := uuid.New().String()
b.streams[id] = stream{ stop := make(chan bool)
id: id,
entries: entries, b.streams[id] = &Stream{
stop: stop, Id: id,
Entries: entries,
Stop: stop,
} }
return entries return entries, stop
} }
// Size returns the size of the ring buffer // Size returns the size of the ring buffer
func (b *Buffer) Size() int { func (b *Buffer) Size() int {
return b.size return b.size
} }
// New returns a new buffer of the given size
func New(i int) *Buffer {
return &Buffer{
size: i,
streams: make(map[string]*Stream),
}
}

View File

@ -1,4 +1,4 @@
package buffer package ring
import ( import (
"testing" "testing"