Decruft the debug logger interface
This commit is contained in:
@@ -1,136 +0,0 @@
|
||||
// Package buffer provides a simple ring buffer for storing local data
|
||||
package buffer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type stream struct {
|
||||
id string
|
||||
entries chan *Entry
|
||||
stop chan bool
|
||||
}
|
||||
|
||||
// Buffer is ring buffer
|
||||
type Buffer struct {
|
||||
size int
|
||||
sync.RWMutex
|
||||
vals []*Entry
|
||||
streams map[string]stream
|
||||
}
|
||||
|
||||
// Entry is ring buffer data entry
|
||||
type Entry struct {
|
||||
Value interface{}
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// New returns a new buffer of the given size
|
||||
func New(i int) *Buffer {
|
||||
return &Buffer{
|
||||
size: i,
|
||||
streams: make(map[string]stream),
|
||||
}
|
||||
}
|
||||
|
||||
// Put adds a new value to ring buffer
|
||||
func (b *Buffer) Put(v interface{}) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
// append to values
|
||||
entry := &Entry{
|
||||
Value: v,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
b.vals = append(b.vals, entry)
|
||||
|
||||
// trim if bigger than size required
|
||||
if len(b.vals) > b.size {
|
||||
b.vals = b.vals[1:]
|
||||
}
|
||||
|
||||
// TODO: this is fucking ugly
|
||||
for _, stream := range b.streams {
|
||||
select {
|
||||
case <-stream.stop:
|
||||
delete(b.streams, stream.id)
|
||||
close(stream.entries)
|
||||
case stream.entries <- entry:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the last n entries
|
||||
func (b *Buffer) Get(n int) []*Entry {
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
|
||||
// reset any invalid values
|
||||
if n > b.size || n < 0 {
|
||||
n = b.size
|
||||
}
|
||||
|
||||
// create a delta
|
||||
delta := b.size - n
|
||||
|
||||
// if all the values are less than delta
|
||||
if len(b.vals) < delta {
|
||||
return b.vals
|
||||
}
|
||||
|
||||
// return the delta set
|
||||
return b.vals[delta:]
|
||||
}
|
||||
|
||||
// Return the entries since a specific time
|
||||
func (b *Buffer) Since(t time.Time) []*Entry {
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
|
||||
// return all the values
|
||||
if t.IsZero() {
|
||||
return b.vals
|
||||
}
|
||||
|
||||
// if its in the future return nothing
|
||||
if time.Since(t).Seconds() < 0.0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, v := range b.vals {
|
||||
// find the starting point
|
||||
d := v.Timestamp.Sub(t)
|
||||
|
||||
// return the values
|
||||
if d.Seconds() > 0.0 {
|
||||
return b.vals[i:]
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream logs from the buffer
|
||||
func (b *Buffer) Stream(stop chan bool) <-chan *Entry {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
entries := make(chan *Entry, 128)
|
||||
id := uuid.New().String()
|
||||
b.streams[id] = stream{
|
||||
id: id,
|
||||
entries: entries,
|
||||
stop: stop,
|
||||
}
|
||||
|
||||
return entries
|
||||
}
|
||||
|
||||
// Size returns the size of the ring buffer
|
||||
func (b *Buffer) Size() int {
|
||||
return b.size
|
||||
}
|
@@ -1,79 +0,0 @@
|
||||
package buffer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestBuffer(t *testing.T) {
|
||||
b := New(10)
|
||||
|
||||
// test one value
|
||||
b.Put("foo")
|
||||
v := b.Get(1)
|
||||
|
||||
if val := v[0].Value.(string); val != "foo" {
|
||||
t.Fatalf("expected foo got %v", val)
|
||||
}
|
||||
|
||||
b = New(10)
|
||||
|
||||
// test 10 values
|
||||
for i := 0; i < 10; i++ {
|
||||
b.Put(i)
|
||||
}
|
||||
|
||||
d := time.Now()
|
||||
v = b.Get(10)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
val := v[i].Value.(int)
|
||||
|
||||
if val != i {
|
||||
t.Fatalf("expected %d got %d", i, val)
|
||||
}
|
||||
}
|
||||
|
||||
// test more values
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
v := i * 2
|
||||
b.Put(v)
|
||||
}
|
||||
|
||||
v = b.Get(10)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
val := v[i].Value.(int)
|
||||
expect := i * 2
|
||||
if val != expect {
|
||||
t.Fatalf("expected %d got %d", expect, val)
|
||||
}
|
||||
}
|
||||
|
||||
// sleep 100 ms
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
// assume we'll get everything
|
||||
v = b.Since(d)
|
||||
|
||||
if len(v) != 10 {
|
||||
t.Fatalf("expected 10 entries but got %d", len(v))
|
||||
}
|
||||
|
||||
// write 1 more entry
|
||||
d = time.Now()
|
||||
b.Put(100)
|
||||
|
||||
// sleep 100 ms
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
v = b.Since(d)
|
||||
if len(v) != 1 {
|
||||
t.Fatalf("expected 1 entries but got %d", len(v))
|
||||
}
|
||||
|
||||
if v[0].Value.(int) != 100 {
|
||||
t.Fatalf("expected value 100 got %v", v[0])
|
||||
}
|
||||
}
|
@@ -4,17 +4,17 @@ import (
|
||||
"fmt"
|
||||
golog "log"
|
||||
|
||||
"github.com/micro/go-micro/debug/buffer"
|
||||
"github.com/micro/go-micro/util/ring"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultSize of the logger buffer
|
||||
DefaultSize = 1000
|
||||
DefaultSize = 1024
|
||||
)
|
||||
|
||||
// defaultLog is default micro log
|
||||
type defaultLog struct {
|
||||
*buffer.Buffer
|
||||
*ring.Buffer
|
||||
}
|
||||
|
||||
// NewLog returns default Logger with
|
||||
@@ -28,7 +28,7 @@ func NewLog(opts ...Option) Log {
|
||||
}
|
||||
|
||||
return &defaultLog{
|
||||
Buffer: buffer.New(options.Size),
|
||||
Buffer: ring.New(options.Size),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
var entries []*buffer.Entry
|
||||
var entries []*ring.Entry
|
||||
// if Since options ha sbeen specified we honor it
|
||||
if !options.Since.IsZero() {
|
||||
entries = l.Buffer.Since(options.Since)
|
||||
@@ -82,9 +82,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
|
||||
}
|
||||
|
||||
// Stream returns channel for reading log records
|
||||
func (l *defaultLog) Stream(stop chan bool) <-chan Record {
|
||||
// along with a stop channel, close it when done
|
||||
func (l *defaultLog) Stream() (<-chan Record, chan bool) {
|
||||
// get stream channel from ring buffer
|
||||
stream := l.Buffer.Stream(stop)
|
||||
stream, stop := l.Buffer.Stream()
|
||||
// make a buffered channel
|
||||
records := make(chan Record, 128)
|
||||
// get last 10 records
|
||||
@@ -110,5 +111,5 @@ func (l *defaultLog) Stream(stop chan bool) <-chan Record {
|
||||
}
|
||||
}()
|
||||
|
||||
return records
|
||||
return records, stop
|
||||
}
|
||||
|
@@ -23,7 +23,7 @@ type Log interface {
|
||||
// Write writes records to log
|
||||
Write(Record)
|
||||
// Stream log records
|
||||
Stream(chan bool) <-chan Record
|
||||
Stream() (<-chan Record, chan bool)
|
||||
}
|
||||
|
||||
// Record is log record entry
|
||||
|
@@ -1,4 +1,4 @@
|
||||
// Pacjage handler implements service debug handler
|
||||
// Package handler implements service debug handler embedded in go-micro services
|
||||
package handler
|
||||
|
||||
import (
|
||||
@@ -66,26 +66,27 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
|
||||
}
|
||||
|
||||
if req.Stream {
|
||||
stop := make(chan bool)
|
||||
defer close(stop)
|
||||
|
||||
// TODO: we need to figure out how to close ithe log stream
|
||||
// TODO: we need to figure out how to close the log stream
|
||||
// It seems like when a client disconnects,
|
||||
// the connection stays open until some timeout expires
|
||||
// or something like that; that means the map of streams
|
||||
// might end up leaking memory if not cleaned up properly
|
||||
records := d.log.Stream(stop)
|
||||
records, stop := d.log.Stream()
|
||||
defer close(stop)
|
||||
|
||||
for record := range records {
|
||||
if err := d.sendRecord(record, stream); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// done streaming, return
|
||||
return nil
|
||||
}
|
||||
|
||||
// get the log records
|
||||
records := d.log.Read(options...)
|
||||
|
||||
// send all the logs downstream
|
||||
for _, record := range records {
|
||||
if err := d.sendRecord(record, stream); err != nil {
|
||||
@@ -102,15 +103,9 @@ func (d *Debug) sendRecord(record log.Record, stream server.Stream) error {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
pbRecord := &proto.Record{
|
||||
return stream.Send(&proto.Record{
|
||||
Timestamp: record.Timestamp.Unix(),
|
||||
Value: record.Value.(string),
|
||||
Metadata: metadata,
|
||||
}
|
||||
|
||||
if err := stream.Send(pbRecord); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
81
debug/service/log.go
Normal file
81
debug/service/log.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/debug/log"
|
||||
pb "github.com/micro/go-micro/debug/service/proto"
|
||||
)
|
||||
|
||||
type serviceLog struct {
|
||||
Client *debugClient
|
||||
}
|
||||
|
||||
// Read reads log entries from the logger
|
||||
func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record {
|
||||
// TODO: parse opts
|
||||
stream, err := s.Client.Log(opts...)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// stream the records until nothing is left
|
||||
var records []log.Record
|
||||
for _, record := range stream {
|
||||
records = append(records, record)
|
||||
}
|
||||
return records
|
||||
}
|
||||
|
||||
// There is no write support
|
||||
func (s *serviceLog) Write(r log.Record) {
|
||||
return
|
||||
}
|
||||
|
||||
// Stream log records
|
||||
func (s *serviceLog) Stream(ch chan bool) (<-chan log.Record, chan bool) {
|
||||
stop := make(chan bool)
|
||||
stream, err := s.Client.Log(log.Stream(true))
|
||||
if err != nil {
|
||||
// return a closed stream
|
||||
stream = make(chan log.Record)
|
||||
close(stream)
|
||||
return stream, stop
|
||||
}
|
||||
|
||||
// stream the records until nothing is left
|
||||
go func() {
|
||||
var records []log.Record
|
||||
for _, record := range stream {
|
||||
select {
|
||||
case stream <- record:
|
||||
case <-stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// return the stream
|
||||
return stream, stop
|
||||
}
|
||||
|
||||
// NewLog returns a new log interface
|
||||
func NewLog(opts ...log.Option) log.Log {
|
||||
var options log.Options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
name := options.Name
|
||||
|
||||
// set the default name
|
||||
if len(name) == 0 {
|
||||
name = debug.DefaultName
|
||||
}
|
||||
|
||||
return serviceLog{
|
||||
Client: newDebugClient(name),
|
||||
}
|
||||
}
|
@@ -1,3 +1,4 @@
|
||||
// Package service provides the service log
|
||||
package service
|
||||
|
||||
import (
|
||||
@@ -12,23 +13,23 @@ import (
|
||||
)
|
||||
|
||||
// Debug provides debug service client
|
||||
type Debug struct {
|
||||
dbg pb.DebugService
|
||||
type debugClient struct {
|
||||
Client pb.DebugService
|
||||
}
|
||||
|
||||
// NewDebug provides Debug service implementation
|
||||
func NewDebug(name string) *Debug {
|
||||
func newDebugClient(name string) *debug {
|
||||
// create default client
|
||||
cli := client.DefaultClient
|
||||
|
||||
return &Debug{
|
||||
dbg: pb.NewDebugService(name, cli),
|
||||
return &debugClient{
|
||||
Client: pb.NewDebugService(name, cli),
|
||||
}
|
||||
}
|
||||
|
||||
// Logs queries the service logs and returns a channel to read the logs from
|
||||
func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
|
||||
options := log.ReadOptions{}
|
||||
func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
|
||||
var options log.ReadOptions
|
||||
// initialize the read options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@@ -46,20 +47,21 @@ func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
|
||||
req.Stream = options.Stream
|
||||
|
||||
// get the log stream
|
||||
stream, err := d.dbg.Log(context.Background(), req)
|
||||
stream, err := d.Client.Log(context.Background(), req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed getting log stream: %s", err)
|
||||
}
|
||||
|
||||
// log channel for streaming logs
|
||||
logChan := make(chan log.Record)
|
||||
|
||||
// go stream logs
|
||||
go d.streamLogs(logChan, stream)
|
||||
|
||||
return logChan, nil
|
||||
}
|
||||
|
||||
func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
|
||||
func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
|
||||
defer stream.Close()
|
||||
|
||||
for {
|
||||
|
@@ -1,11 +1,11 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/debug/buffer"
|
||||
"github.com/micro/go-micro/util/ring"
|
||||
)
|
||||
|
||||
type stats struct {
|
||||
buffer *buffer.Buffer
|
||||
buffer *ring.Buffer
|
||||
}
|
||||
|
||||
func (s *stats) Read() ([]*Stat, error) {
|
||||
@@ -33,6 +33,6 @@ func (s *stats) Write(stat *Stat) error {
|
||||
// TODO add options
|
||||
func NewStats() Stats {
|
||||
return &stats{
|
||||
buffer: buffer.New(1024),
|
||||
buffer: ring.New(1024),
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user