From bc30efcf709168a6f6d501c988d707c8f7578957 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 17 Dec 2019 15:38:03 +0000 Subject: [PATCH 1/3] Decruft the debug logger interface --- debug/log/default.go | 17 ++--- debug/log/log.go | 2 +- debug/service/handler/debug.go | 23 +++--- debug/service/log.go | 81 ++++++++++++++++++++++ debug/service/service.go | 20 +++--- debug/stats/default.go | 6 +- {debug/buffer => util/ring}/buffer.go | 61 +++++++++------- {debug/buffer => util/ring}/buffer_test.go | 2 +- 8 files changed, 150 insertions(+), 62 deletions(-) create mode 100644 debug/service/log.go rename {debug/buffer => util/ring}/buffer.go (72%) rename {debug/buffer => util/ring}/buffer_test.go (98%) diff --git a/debug/log/default.go b/debug/log/default.go index 6f7bddab..3d79bdc9 100644 --- a/debug/log/default.go +++ b/debug/log/default.go @@ -4,17 +4,17 @@ import ( "fmt" golog "log" - "github.com/micro/go-micro/debug/buffer" + "github.com/micro/go-micro/util/ring" ) var ( // DefaultSize of the logger buffer - DefaultSize = 1000 + DefaultSize = 1024 ) // defaultLog is default micro log type defaultLog struct { - *buffer.Buffer + *ring.Buffer } // NewLog returns default Logger with @@ -28,7 +28,7 @@ func NewLog(opts ...Option) Log { } return &defaultLog{ - Buffer: buffer.New(options.Size), + Buffer: ring.New(options.Size), } } @@ -46,7 +46,7 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { o(&options) } - var entries []*buffer.Entry + var entries []*ring.Entry // if Since options ha sbeen specified we honor it if !options.Since.IsZero() { entries = l.Buffer.Since(options.Since) @@ -82,9 +82,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { } // Stream returns channel for reading log records -func (l *defaultLog) Stream(stop chan bool) <-chan Record { +// along with a stop channel, close it when done +func (l *defaultLog) Stream() (<-chan Record, chan bool) { // get stream channel from ring buffer - stream := l.Buffer.Stream(stop) + stream, stop := l.Buffer.Stream() // make a buffered channel records := make(chan Record, 128) // get last 10 records @@ -110,5 +111,5 @@ func (l *defaultLog) Stream(stop chan bool) <-chan Record { } }() - return records + return records, stop } diff --git a/debug/log/log.go b/debug/log/log.go index 42a8f558..9425ff19 100644 --- a/debug/log/log.go +++ b/debug/log/log.go @@ -23,7 +23,7 @@ type Log interface { // Write writes records to log Write(Record) // Stream log records - Stream(chan bool) <-chan Record + Stream() (<-chan Record, chan bool) } // Record is log record entry diff --git a/debug/service/handler/debug.go b/debug/service/handler/debug.go index ee24b27a..7f2f16e4 100644 --- a/debug/service/handler/debug.go +++ b/debug/service/handler/debug.go @@ -1,4 +1,4 @@ -// Pacjage handler implements service debug handler +// Package handler implements service debug handler embedded in go-micro services package handler import ( @@ -66,26 +66,27 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error { } if req.Stream { - stop := make(chan bool) - defer close(stop) - - // TODO: we need to figure out how to close ithe log stream + // TODO: we need to figure out how to close the log stream // It seems like when a client disconnects, // the connection stays open until some timeout expires // or something like that; that means the map of streams // might end up leaking memory if not cleaned up properly - records := d.log.Stream(stop) + records, stop := d.log.Stream() + defer close(stop) + for record := range records { if err := d.sendRecord(record, stream); err != nil { return err } } + // done streaming, return return nil } // get the log records records := d.log.Read(options...) + // send all the logs downstream for _, record := range records { if err := d.sendRecord(record, stream); err != nil { @@ -102,15 +103,9 @@ func (d *Debug) sendRecord(record log.Record, stream server.Stream) error { metadata[k] = v } - pbRecord := &proto.Record{ + return stream.Send(&proto.Record{ Timestamp: record.Timestamp.Unix(), Value: record.Value.(string), Metadata: metadata, - } - - if err := stream.Send(pbRecord); err != nil { - return err - } - - return nil + }) } diff --git a/debug/service/log.go b/debug/service/log.go new file mode 100644 index 00000000..7a9aabc5 --- /dev/null +++ b/debug/service/log.go @@ -0,0 +1,81 @@ +package service + +import ( + "context" + "fmt" + "time" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/debug/log" + pb "github.com/micro/go-micro/debug/service/proto" +) + +type serviceLog struct { + Client *debugClient +} + +// Read reads log entries from the logger +func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record { + // TODO: parse opts + stream, err := s.Client.Log(opts...) + if err != nil { + return nil + } + // stream the records until nothing is left + var records []log.Record + for _, record := range stream { + records = append(records, record) + } + return records +} + +// There is no write support +func (s *serviceLog) Write(r log.Record) { + return +} + +// Stream log records +func (s *serviceLog) Stream(ch chan bool) (<-chan log.Record, chan bool) { + stop := make(chan bool) + stream, err := s.Client.Log(log.Stream(true)) + if err != nil { + // return a closed stream + stream = make(chan log.Record) + close(stream) + return stream, stop + } + + // stream the records until nothing is left + go func() { + var records []log.Record + for _, record := range stream { + select { + case stream <- record: + case <-stop: + return + } + } + }() + + // return the stream + return stream, stop +} + +// NewLog returns a new log interface +func NewLog(opts ...log.Option) log.Log { + var options log.Options + for _, o := range opts { + o(&options) + } + + name := options.Name + + // set the default name + if len(name) == 0 { + name = debug.DefaultName + } + + return serviceLog{ + Client: newDebugClient(name), + } +} diff --git a/debug/service/service.go b/debug/service/service.go index d48b0a3d..7527127a 100644 --- a/debug/service/service.go +++ b/debug/service/service.go @@ -1,3 +1,4 @@ +// Package service provides the service log package service import ( @@ -12,23 +13,23 @@ import ( ) // Debug provides debug service client -type Debug struct { - dbg pb.DebugService +type debugClient struct { + Client pb.DebugService } // NewDebug provides Debug service implementation -func NewDebug(name string) *Debug { +func newDebugClient(name string) *debug { // create default client cli := client.DefaultClient - return &Debug{ - dbg: pb.NewDebugService(name, cli), + return &debugClient{ + Client: pb.NewDebugService(name, cli), } } // Logs queries the service logs and returns a channel to read the logs from -func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) { - options := log.ReadOptions{} +func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) { + var options log.ReadOptions // initialize the read options for _, o := range opts { o(&options) @@ -46,20 +47,21 @@ func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) { req.Stream = options.Stream // get the log stream - stream, err := d.dbg.Log(context.Background(), req) + stream, err := d.Client.Log(context.Background(), req) if err != nil { return nil, fmt.Errorf("failed getting log stream: %s", err) } // log channel for streaming logs logChan := make(chan log.Record) + // go stream logs go d.streamLogs(logChan, stream) return logChan, nil } -func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { +func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) { defer stream.Close() for { diff --git a/debug/stats/default.go b/debug/stats/default.go index b2d78fce..357d6303 100644 --- a/debug/stats/default.go +++ b/debug/stats/default.go @@ -1,11 +1,11 @@ package stats import ( - "github.com/micro/go-micro/debug/buffer" + "github.com/micro/go-micro/util/ring" ) type stats struct { - buffer *buffer.Buffer + buffer *ring.Buffer } func (s *stats) Read() ([]*Stat, error) { @@ -33,6 +33,6 @@ func (s *stats) Write(stat *Stat) error { // TODO add options func NewStats() Stats { return &stats{ - buffer: buffer.New(1024), + buffer: ring.New(1024), } } diff --git a/debug/buffer/buffer.go b/util/ring/buffer.go similarity index 72% rename from debug/buffer/buffer.go rename to util/ring/buffer.go index 007293aa..bb8febcf 100644 --- a/debug/buffer/buffer.go +++ b/util/ring/buffer.go @@ -1,5 +1,5 @@ -// Package buffer provides a simple ring buffer for storing local data -package buffer +// Package ring provides a simple ring buffer for storing local data +package ring import ( "sync" @@ -8,18 +8,13 @@ import ( "github.com/google/uuid" ) -type stream struct { - id string - entries chan *Entry - stop chan bool -} - // Buffer is ring buffer type Buffer struct { size int + sync.RWMutex vals []*Entry - streams map[string]stream + streams map[string]*Stream } // Entry is ring buffer data entry @@ -28,12 +23,14 @@ type Entry struct { Timestamp time.Time } -// New returns a new buffer of the given size -func New(i int) *Buffer { - return &Buffer{ - size: i, - streams: make(map[string]stream), - } +// Stream is used to stream the buffer +type Stream struct { + // Id of the stream + Id string + // Buffered entries + Entries chan *Entry + // Stop channel + Stop chan bool } // Put adds a new value to ring buffer @@ -53,13 +50,13 @@ func (b *Buffer) Put(v interface{}) { b.vals = b.vals[1:] } - // TODO: this is fucking ugly + // send to every stream for _, stream := range b.streams { select { - case <-stream.stop: - delete(b.streams, stream.id) - close(stream.entries) - case stream.entries <- entry: + case <-stream.Stop: + delete(b.streams, stream.Id) + close(stream.Entries) + case stream.Entries <- entry: } } } @@ -115,22 +112,34 @@ func (b *Buffer) Since(t time.Time) []*Entry { } // Stream logs from the buffer -func (b *Buffer) Stream(stop chan bool) <-chan *Entry { +// Close the channel when you want to stop +func (b *Buffer) Stream() (<-chan *Entry, chan bool) { b.Lock() defer b.Unlock() entries := make(chan *Entry, 128) id := uuid.New().String() - b.streams[id] = stream{ - id: id, - entries: entries, - stop: stop, + stop := make(chan bool) + + b.streams[id] = &Stream{ + Id: id, + Entries: entries, + Stop: stop, } - return entries + return entries, stop } // Size returns the size of the ring buffer func (b *Buffer) Size() int { return b.size } + +// New returns a new buffer of the given size +func New(i int) *Buffer { + return &Buffer{ + size: i, + streams: make(map[string]*Stream), + } +} + diff --git a/debug/buffer/buffer_test.go b/util/ring/buffer_test.go similarity index 98% rename from debug/buffer/buffer_test.go rename to util/ring/buffer_test.go index 3a107923..0c9b5378 100644 --- a/debug/buffer/buffer_test.go +++ b/util/ring/buffer_test.go @@ -1,4 +1,4 @@ -package buffer +package ring import ( "testing" From d502e2f58abf344c50ceb4dc421cb9b248344b2b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 17 Dec 2019 15:46:09 +0000 Subject: [PATCH 2/3] fix breaks --- debug/log/log.go | 2 +- debug/log/options.go | 9 +++++++++ debug/service/log.go | 31 +++++++++++++------------------ debug/service/service.go | 2 +- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/debug/log/log.go b/debug/log/log.go index 9425ff19..918d0c55 100644 --- a/debug/log/log.go +++ b/debug/log/log.go @@ -174,6 +174,6 @@ func SetPrefix(p string) { } // Set service name -func Name(name string) { +func SetName(name string) { prefix = fmt.Sprintf("[%s]", name) } diff --git a/debug/log/options.go b/debug/log/options.go index 03dece38..0d83394b 100644 --- a/debug/log/options.go +++ b/debug/log/options.go @@ -7,10 +7,19 @@ type Option func(*Options) // Options are logger options type Options struct { + // Name of the log + Name string // Size is the size of ring buffer Size int } +// Name of the log +func Name(n string) Option { + return func(o *Options) { + o.Name = n + } +} + // Size sets the size of the ring buffer func Size(s int) Option { return func(o *Options) { diff --git a/debug/service/log.go b/debug/service/log.go index 7a9aabc5..3508874c 100644 --- a/debug/service/log.go +++ b/debug/service/log.go @@ -1,13 +1,8 @@ package service import ( - "context" - "fmt" - "time" - - "github.com/micro/go-micro/client" + "github.com/micro/go-micro/debug" "github.com/micro/go-micro/debug/log" - pb "github.com/micro/go-micro/debug/service/proto" ) type serviceLog struct { @@ -23,7 +18,7 @@ func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record { } // stream the records until nothing is left var records []log.Record - for _, record := range stream { + for record := range stream { records = append(records, record) } return records @@ -35,30 +30,30 @@ func (s *serviceLog) Write(r log.Record) { } // Stream log records -func (s *serviceLog) Stream(ch chan bool) (<-chan log.Record, chan bool) { +func (s *serviceLog) Stream() (<-chan log.Record, chan bool) { stop := make(chan bool) stream, err := s.Client.Log(log.Stream(true)) if err != nil { // return a closed stream - stream = make(chan log.Record) - close(stream) - return stream, stop + deadStream := make(chan log.Record) + close(deadStream) + return deadStream, stop } - // stream the records until nothing is left + newStream := make(chan log.Record, 128) + go func() { - var records []log.Record - for _, record := range stream { + for { select { - case stream <- record: + case rec := <-stream: + newStream <- rec case <-stop: return } } }() - // return the stream - return stream, stop + return newStream, stop } // NewLog returns a new log interface @@ -75,7 +70,7 @@ func NewLog(opts ...log.Option) log.Log { name = debug.DefaultName } - return serviceLog{ + return &serviceLog{ Client: newDebugClient(name), } } diff --git a/debug/service/service.go b/debug/service/service.go index 7527127a..7d73c753 100644 --- a/debug/service/service.go +++ b/debug/service/service.go @@ -18,7 +18,7 @@ type debugClient struct { } // NewDebug provides Debug service implementation -func newDebugClient(name string) *debug { +func newDebugClient(name string) *debugClient { // create default client cli := client.DefaultClient From b35dfb1086004dd862c29ec225244d1594e440fe Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 17 Dec 2019 15:56:49 +0000 Subject: [PATCH 3/3] fix further breaks --- debug/service/log.go | 2 +- debug/service/service.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/debug/service/log.go b/debug/service/log.go index 3508874c..647b349a 100644 --- a/debug/service/log.go +++ b/debug/service/log.go @@ -71,6 +71,6 @@ func NewLog(opts ...log.Option) log.Log { } return &serviceLog{ - Client: newDebugClient(name), + Client: NewClient(name), } } diff --git a/debug/service/service.go b/debug/service/service.go index 7d73c753..1da11c6e 100644 --- a/debug/service/service.go +++ b/debug/service/service.go @@ -17,8 +17,8 @@ type debugClient struct { Client pb.DebugService } -// NewDebug provides Debug service implementation -func newDebugClient(name string) *debugClient { +// NewClient provides a debug client +func NewClient(name string) *debugClient { // create default client cli := client.DefaultClient