Added hack support for logs streaming cruft

This commit is contained in:
Milos Gajdos 2019-11-30 12:39:29 +00:00
parent 7f1dea72f2
commit ecdadef633
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
5 changed files with 112 additions and 24 deletions

View File

@ -4,13 +4,22 @@ package buffer
import ( import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
) )
type stream struct {
id string
entries chan *Entry
stop chan bool
}
// Buffer is ring buffer // Buffer is ring buffer
type Buffer struct { type Buffer struct {
size int size int
sync.RWMutex sync.RWMutex
vals []*Entry vals []*Entry
streams map[string]stream
} }
// Entry is ring buffer data entry // Entry is ring buffer data entry
@ -22,7 +31,8 @@ type Entry struct {
// New returns a new buffer of the given size // New returns a new buffer of the given size
func New(i int) *Buffer { func New(i int) *Buffer {
return &Buffer{ return &Buffer{
size: i, size: i,
streams: make(map[string]stream),
} }
} }
@ -32,15 +42,26 @@ func (b *Buffer) Put(v interface{}) {
defer b.Unlock() defer b.Unlock()
// append to values // append to values
b.vals = append(b.vals, &Entry{ entry := &Entry{
Value: v, Value: v,
Timestamp: time.Now(), Timestamp: time.Now(),
}) }
b.vals = append(b.vals, entry)
// trim if bigger than size required // trim if bigger than size required
if len(b.vals) > b.size { if len(b.vals) > b.size {
b.vals = b.vals[1:] b.vals = b.vals[1:]
} }
// TODO: this is fucking ugly
for _, stream := range b.streams {
select {
case <-stream.stop:
delete(b.streams, stream.id)
close(stream.entries)
case stream.entries <- entry:
}
}
} }
// Get returns the last n entries // Get returns the last n entries
@ -93,6 +114,22 @@ func (b *Buffer) Since(t time.Time) []*Entry {
return nil return nil
} }
// Stream logs from the buffer
func (b *Buffer) Stream(stop chan bool) <-chan *Entry {
b.Lock()
defer b.Unlock()
entries := make(chan *Entry, 128)
id := uuid.New().String()
b.streams[id] = stream{
id: id,
entries: entries,
stop: stop,
}
return entries
}
// Size returns the size of the ring buffer // Size returns the size of the ring buffer
func (b *Buffer) Size() int { func (b *Buffer) Size() int {
return b.size return b.size

View File

@ -59,27 +59,52 @@ func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.De
options = append(options, log.Count(count)) options = append(options, log.Count(count))
} }
if req.Stream {
stop := make(chan bool)
defer close(stop)
// TODO: figure out how to close log stream
// It seems when the client disconnects,
// the connection stays open until some timeout expires
// or something like that; that means the map of streams
// might end up bloating if not cleaned up properly
records := d.log.Stream(stop)
for record := range records {
if err := d.sendRecord(record, stream); err != nil {
return err
}
}
// done streaming, return
return nil
}
// get the log records // get the log records
records := d.log.Read(options...) records := d.log.Read(options...)
// send all the logs downstream
// TODO: figure out the stream
for _, record := range records { for _, record := range records {
metadata := make(map[string]string) if err := d.sendRecord(record, stream); err != nil {
for k, v := range record.Metadata {
metadata[k] = v
}
recLog := &proto.Log{
Timestamp: record.Timestamp.UnixNano(),
Value: record.Value.(string),
Metadata: metadata,
}
if err := stream.Send(recLog); err != nil {
return err return err
} }
} }
return nil return nil
} }
func (d *Debug) sendRecord(record log.Record, stream proto.Debug_LogsStream) error {
metadata := make(map[string]string)
for k, v := range record.Metadata {
metadata[k] = v
}
recLog := &proto.Log{
Timestamp: record.Timestamp.UnixNano(),
Value: record.Value.(string),
Metadata: metadata,
}
if err := stream.Send(recLog); err != nil {
return err
}
return nil
}

View File

@ -34,8 +34,8 @@ func NewLog(opts ...Option) Log {
// Write writes logs into logger // Write writes logs into logger
func (l *defaultLog) Write(v ...interface{}) { func (l *defaultLog) Write(v ...interface{}) {
l.Buffer.Put(fmt.Sprint(v...))
golog.Print(v...) golog.Print(v...)
l.Buffer.Put(fmt.Sprint(v...))
} }
// Read reads logs and returns them // Read reads logs and returns them
@ -53,10 +53,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
} }
// only if we specified valid count constraint // only if we specified valid count constraint
// do we do some serious if-else kung-fu // do we end up doing some serious if-else kung-fu
// if since has been given we return *count* number of // if since constraint has been provided
// logs since the requested timestamp; // we return *count* number of logs since the given timestamp;
// otherwise we retourn last count number of logs // otherwise we return last count number of logs
if options.Count > 0 { if options.Count > 0 {
switch len(entries) > 0 { switch len(entries) > 0 {
case true: case true:
@ -80,3 +80,25 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
return records return records
} }
// Stream returns channel for reading log records
func (l *defaultLog) Stream(stop chan bool) <-chan Record {
// get stream channel from ring buffer
stream := l.Buffer.Stream(stop)
records := make(chan Record)
fmt.Println("requested log stream")
// stream the log records
go func() {
for entry := range stream {
records <- Record{
Timestamp: entry.Timestamp,
Value: entry.Value,
Metadata: make(map[string]string),
}
}
}()
return records
}

View File

@ -22,6 +22,8 @@ type Log interface {
Read(...ReadOption) []Record Read(...ReadOption) []Record
// Write writes logs to logger // Write writes logs to logger
Write(...interface{}) Write(...interface{})
// Stream logs
Stream(chan bool) <-chan Record
} }
// Record is log record entry // Record is log record entry

View File

@ -43,6 +43,8 @@ func (d *Debug) Logs(opts ...log.ReadOption) (<-chan log.Record, error) {
req.Count = int64(options.Count) req.Count = int64(options.Count)
} }
req.Stream = options.Stream
// get the log stream // get the log stream
stream, err := d.dbg.Logs(context.Background(), req) stream, err := d.dbg.Logs(context.Background(), req)
if err != nil { if err != nil {