diff --git a/debug/log/default.go b/debug/log/default.go index 6f7bddab..3d79bdc9 100644 --- a/debug/log/default.go +++ b/debug/log/default.go @@ -4,17 +4,17 @@ import ( "fmt" golog "log" - "github.com/micro/go-micro/debug/buffer" + "github.com/micro/go-micro/util/ring" ) var ( // DefaultSize of the logger buffer - DefaultSize = 1000 + DefaultSize = 1024 ) // defaultLog is default micro log type defaultLog struct { - *buffer.Buffer + *ring.Buffer } // NewLog returns default Logger with @@ -28,7 +28,7 @@ func NewLog(opts ...Option) Log { } return &defaultLog{ - Buffer: buffer.New(options.Size), + Buffer: ring.New(options.Size), } } @@ -46,7 +46,7 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { o(&options) } - var entries []*buffer.Entry + var entries []*ring.Entry // if Since options ha sbeen specified we honor it if !options.Since.IsZero() { entries = l.Buffer.Since(options.Since) @@ -82,9 +82,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { } // Stream returns channel for reading log records -func (l *defaultLog) Stream(stop chan bool) <-chan Record { +// along with a stop channel, close it when done +func (l *defaultLog) Stream() (<-chan Record, chan bool) { // get stream channel from ring buffer - stream := l.Buffer.Stream(stop) + stream, stop := l.Buffer.Stream() // make a buffered channel records := make(chan Record, 128) // get last 10 records @@ -110,5 +111,5 @@ func (l *defaultLog) Stream(stop chan bool) <-chan Record { } }() - return records + return records, stop } diff --git a/debug/log/log.go b/debug/log/log.go index f4ad0d76..1e21b904 100644 --- a/debug/log/log.go +++ b/debug/log/log.go @@ -23,7 +23,7 @@ type Log interface { // Write writes records to log Write(Record) // Stream log records - Stream(chan bool) <-chan Record + Stream() (<-chan Record, chan bool) } // Record is log record entry @@ -174,6 +174,6 @@ func SetPrefix(p string) { } // Set service name -func Name(name string) { +func SetName(name string) { prefix = fmt.Sprintf("[%s]", name) } diff --git a/debug/log/options.go b/debug/log/options.go index 03dece38..0d83394b 100644 --- a/debug/log/options.go +++ b/debug/log/options.go @@ -7,10 +7,19 @@ type Option func(*Options) // Options are logger options type Options struct { + // Name of the log + Name string // Size is the size of ring buffer Size int } +// Name of the log +func Name(n string) Option { + return func(o *Options) { + o.Name = n + } +} + // Size sets the size of the ring buffer func Size(s int) Option { return func(o *Options) { diff --git a/debug/service/handler/debug.go b/debug/service/handler/debug.go index ee24b27a..7f2f16e4 100644 --- a/debug/service/handler/debug.go +++ b/debug/service/handler/debug.go @@ -1,4 +1,4 @@ -// Pacjage handler implements service debug handler +// Package handler implements service debug handler embedded in go-micro services package handler import ( @@ -66,26 +66,27 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error { } if req.Stream { - stop := make(chan bool) - defer close(stop) - - // TODO: we need to figure out how to close ithe log stream + // TODO: we need to figure out how to close the log stream // It seems like when a client disconnects, // the connection stays open until some timeout expires // or something like that; that means the map of streams // might end up leaking memory if not cleaned up properly - records := d.log.Stream(stop) + records, stop := d.log.Stream() + defer close(stop) + for record := range records { if err := d.sendRecord(record, stream); err != nil { return err } } + // done streaming, return return nil } // get the log records records := d.log.Read(options...) + // send all the logs downstream for _, record := range records { if err := d.sendRecord(record, stream); err != nil { @@ -102,15 +103,9 @@ func (d *Debug) sendRecord(record log.Record, stream server.Stream) error { metadata[k] = v } - pbRecord := &proto.Record{ + return stream.Send(&proto.Record{ Timestamp: record.Timestamp.Unix(), Value: record.Value.(string), Metadata: metadata, - } - - if err := stream.Send(pbRecord); err != nil { - return err - } - - return nil + }) } diff --git a/debug/service/log.go b/debug/service/log.go new file mode 100644 index 00000000..647b349a --- /dev/null +++ b/debug/service/log.go @@ -0,0 +1,76 @@ +package service + +import ( + "github.com/micro/go-micro/debug" + "github.com/micro/go-micro/debug/log" +) + +type serviceLog struct { + Client *debugClient +} + +// Read reads log entries from the logger +func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record { + // TODO: parse opts + stream, err := s.Client.Log(opts...) + if err != nil { + return nil + } + // stream the records until nothing is left + var records []log.Record + for record := range stream { + records = append(records, record) + } + return records +} + +// There is no write support +func (s *serviceLog) Write(r log.Record) { + return +} + +// Stream log records +func (s *serviceLog) Stream() (<-chan log.Record, chan bool) { + stop := make(chan bool) + stream, err := s.Client.Log(log.Stream(true)) + if err != nil { + // return a closed stream + deadStream := make(chan log.Record) + close(deadStream) + return deadStream, stop + } + + newStream := make(chan log.Record, 128) + + go func() { + for { + select { + case rec := <-stream: + newStream <- rec + case <-stop: + return + } + } + }() + + return newStream, stop +} + +// NewLog returns a new log interface +func NewLog(opts ...log.Option) log.Log { + var options log.Options + for _, o := range opts { + o(&options) + } + + name := options.Name + + // set the default name + if len(name) == 0 { + name = debug.DefaultName + } + + return &serviceLog{ + Client: NewClient(name), + } +} diff --git a/debug/service/service.go b/debug/service/service.go index d48b0a3d..1da11c6e 100644 --- a/debug/service/service.go +++ b/debug/service/service.go @@ -1,3 +1,4 @@ +// Package service provides the service log package service import ( @@ -12,23 +13,23 @@ import ( ) // Debug provides debug service client -type Debug struct { - dbg pb.DebugService +type debugClient struct { + Client pb.DebugService } -// NewDebug provides Debug service implementation -func NewDebug(name string) *Debug { +// NewClient provides a debug client +func NewClient(name string) *debugClient { // create default client cli := client.DefaultClient - return &Debug{ - dbg: pb.NewDebugService(name, cli), + return &debugClient{ + Client: pb.NewDebugService(name, cli), } } // Logs queries the service logs and returns a channel to read the logs from -func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) { - options := log.ReadOptions{} +func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) { + var options log.ReadOptions // initialize the read options for _, o := range opts { o(&options) @@ -46,20 +47,21 @@ func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) { req.Stream = options.Stream // get the log stream - stream, err := d.dbg.Log(context.Background(), req) + stream, err := d.Client.Log(context.Background(), req) if err != nil { return nil, fmt.Errorf("failed getting log stream: %s", err) } // log channel for streaming logs logChan := make(chan log.Record) + // go stream logs go d.streamLogs(logChan, stream) return logChan, nil } -func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { +func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { defer stream.Close() for { diff --git a/debug/stats/default.go b/debug/stats/default.go index b2d78fce..357d6303 100644 --- a/debug/stats/default.go +++ b/debug/stats/default.go @@ -1,11 +1,11 @@ package stats import ( - "github.com/micro/go-micro/debug/buffer" + "github.com/micro/go-micro/util/ring" ) type stats struct { - buffer *buffer.Buffer + buffer *ring.Buffer } func (s *stats) Read() ([]*Stat, error) { @@ -33,6 +33,6 @@ func (s *stats) Write(stat *Stat) error { // TODO add options func NewStats() Stats { return &stats{ - buffer: buffer.New(1024), + buffer: ring.New(1024), } } diff --git a/debug/buffer/buffer.go b/util/ring/buffer.go similarity index 72% rename from debug/buffer/buffer.go rename to util/ring/buffer.go index 007293aa..bb8febcf 100644 --- a/debug/buffer/buffer.go +++ b/util/ring/buffer.go @@ -1,5 +1,5 @@ -// Package buffer provides a simple ring buffer for storing local data -package buffer +// Package ring provides a simple ring buffer for storing local data +package ring import ( "sync" @@ -8,18 +8,13 @@ import ( "github.com/google/uuid" ) -type stream struct { - id string - entries chan *Entry - stop chan bool -} - // Buffer is ring buffer type Buffer struct { size int + sync.RWMutex vals []*Entry - streams map[string]stream + streams map[string]*Stream } // Entry is ring buffer data entry @@ -28,12 +23,14 @@ type Entry struct { Timestamp time.Time } -// New returns a new buffer of the given size -func New(i int) *Buffer { - return &Buffer{ - size: i, - streams: make(map[string]stream), - } +// Stream is used to stream the buffer +type Stream struct { + // Id of the stream + Id string + // Buffered entries + Entries chan *Entry + // Stop channel + Stop chan bool } // Put adds a new value to ring buffer @@ -53,13 +50,13 @@ func (b *Buffer) Put(v interface{}) { b.vals = b.vals[1:] } - // TODO: this is fucking ugly + // send to every stream for _, stream := range b.streams { select { - case <-stream.stop: - delete(b.streams, stream.id) - close(stream.entries) - case stream.entries <- entry: + case <-stream.Stop: + delete(b.streams, stream.Id) + close(stream.Entries) + case stream.Entries <- entry: } } } @@ -115,22 +112,34 @@ func (b *Buffer) Since(t time.Time) []*Entry { } // Stream logs from the buffer -func (b *Buffer) Stream(stop chan bool) <-chan *Entry { +// Close the channel when you want to stop +func (b *Buffer) Stream() (<-chan *Entry, chan bool) { b.Lock() defer b.Unlock() entries := make(chan *Entry, 128) id := uuid.New().String() - b.streams[id] = stream{ - id: id, - entries: entries, - stop: stop, + stop := make(chan bool) + + b.streams[id] = &Stream{ + Id: id, + Entries: entries, + Stop: stop, } - return entries + return entries, stop } // Size returns the size of the ring buffer func (b *Buffer) Size() int { return b.size } + +// New returns a new buffer of the given size +func New(i int) *Buffer { + return &Buffer{ + size: i, + streams: make(map[string]*Stream), + } +} + diff --git a/debug/buffer/buffer_test.go b/util/ring/buffer_test.go similarity index 98% rename from debug/buffer/buffer_test.go rename to util/ring/buffer_test.go index 3a107923..0c9b5378 100644 --- a/debug/buffer/buffer_test.go +++ b/util/ring/buffer_test.go @@ -1,4 +1,4 @@ -package buffer +package ring import ( "testing"