commit
91e057440d
@ -4,17 +4,17 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
golog "log"
|
golog "log"
|
||||||
|
|
||||||
"github.com/micro/go-micro/debug/buffer"
|
"github.com/micro/go-micro/util/ring"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// DefaultSize of the logger buffer
|
// DefaultSize of the logger buffer
|
||||||
DefaultSize = 1000
|
DefaultSize = 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// defaultLog is default micro log
|
// defaultLog is default micro log
|
||||||
type defaultLog struct {
|
type defaultLog struct {
|
||||||
*buffer.Buffer
|
*ring.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLog returns default Logger with
|
// NewLog returns default Logger with
|
||||||
@ -28,7 +28,7 @@ func NewLog(opts ...Option) Log {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &defaultLog{
|
return &defaultLog{
|
||||||
Buffer: buffer.New(options.Size),
|
Buffer: ring.New(options.Size),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,7 +46,7 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
|
|||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
var entries []*buffer.Entry
|
var entries []*ring.Entry
|
||||||
// if Since options ha sbeen specified we honor it
|
// if Since options ha sbeen specified we honor it
|
||||||
if !options.Since.IsZero() {
|
if !options.Since.IsZero() {
|
||||||
entries = l.Buffer.Since(options.Since)
|
entries = l.Buffer.Since(options.Since)
|
||||||
@ -82,9 +82,10 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stream returns channel for reading log records
|
// Stream returns channel for reading log records
|
||||||
func (l *defaultLog) Stream(stop chan bool) <-chan Record {
|
// along with a stop channel, close it when done
|
||||||
|
func (l *defaultLog) Stream() (<-chan Record, chan bool) {
|
||||||
// get stream channel from ring buffer
|
// get stream channel from ring buffer
|
||||||
stream := l.Buffer.Stream(stop)
|
stream, stop := l.Buffer.Stream()
|
||||||
// make a buffered channel
|
// make a buffered channel
|
||||||
records := make(chan Record, 128)
|
records := make(chan Record, 128)
|
||||||
// get last 10 records
|
// get last 10 records
|
||||||
@ -110,5 +111,5 @@ func (l *defaultLog) Stream(stop chan bool) <-chan Record {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return records
|
return records, stop
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ type Log interface {
|
|||||||
// Write writes records to log
|
// Write writes records to log
|
||||||
Write(Record)
|
Write(Record)
|
||||||
// Stream log records
|
// Stream log records
|
||||||
Stream(chan bool) <-chan Record
|
Stream() (<-chan Record, chan bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record is log record entry
|
// Record is log record entry
|
||||||
@ -174,6 +174,6 @@ func SetPrefix(p string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set service name
|
// Set service name
|
||||||
func Name(name string) {
|
func SetName(name string) {
|
||||||
prefix = fmt.Sprintf("[%s]", name)
|
prefix = fmt.Sprintf("[%s]", name)
|
||||||
}
|
}
|
||||||
|
@ -7,10 +7,19 @@ type Option func(*Options)
|
|||||||
|
|
||||||
// Options are logger options
|
// Options are logger options
|
||||||
type Options struct {
|
type Options struct {
|
||||||
|
// Name of the log
|
||||||
|
Name string
|
||||||
// Size is the size of ring buffer
|
// Size is the size of ring buffer
|
||||||
Size int
|
Size int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name of the log
|
||||||
|
func Name(n string) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Name = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Size sets the size of the ring buffer
|
// Size sets the size of the ring buffer
|
||||||
func Size(s int) Option {
|
func Size(s int) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Pacjage handler implements service debug handler
|
// Package handler implements service debug handler embedded in go-micro services
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -66,26 +66,27 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if req.Stream {
|
if req.Stream {
|
||||||
stop := make(chan bool)
|
// TODO: we need to figure out how to close the log stream
|
||||||
defer close(stop)
|
|
||||||
|
|
||||||
// TODO: we need to figure out how to close ithe log stream
|
|
||||||
// It seems like when a client disconnects,
|
// It seems like when a client disconnects,
|
||||||
// the connection stays open until some timeout expires
|
// the connection stays open until some timeout expires
|
||||||
// or something like that; that means the map of streams
|
// or something like that; that means the map of streams
|
||||||
// might end up leaking memory if not cleaned up properly
|
// might end up leaking memory if not cleaned up properly
|
||||||
records := d.log.Stream(stop)
|
records, stop := d.log.Stream()
|
||||||
|
defer close(stop)
|
||||||
|
|
||||||
for record := range records {
|
for record := range records {
|
||||||
if err := d.sendRecord(record, stream); err != nil {
|
if err := d.sendRecord(record, stream); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// done streaming, return
|
// done streaming, return
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the log records
|
// get the log records
|
||||||
records := d.log.Read(options...)
|
records := d.log.Read(options...)
|
||||||
|
|
||||||
// send all the logs downstream
|
// send all the logs downstream
|
||||||
for _, record := range records {
|
for _, record := range records {
|
||||||
if err := d.sendRecord(record, stream); err != nil {
|
if err := d.sendRecord(record, stream); err != nil {
|
||||||
@ -102,15 +103,9 @@ func (d *Debug) sendRecord(record log.Record, stream server.Stream) error {
|
|||||||
metadata[k] = v
|
metadata[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
pbRecord := &proto.Record{
|
return stream.Send(&proto.Record{
|
||||||
Timestamp: record.Timestamp.Unix(),
|
Timestamp: record.Timestamp.Unix(),
|
||||||
Value: record.Value.(string),
|
Value: record.Value.(string),
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
}
|
})
|
||||||
|
|
||||||
if err := stream.Send(pbRecord); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
76
debug/service/log.go
Normal file
76
debug/service/log.go
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/micro/go-micro/debug"
|
||||||
|
"github.com/micro/go-micro/debug/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type serviceLog struct {
|
||||||
|
Client *debugClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads log entries from the logger
|
||||||
|
func (s *serviceLog) Read(opts ...log.ReadOption) []log.Record {
|
||||||
|
// TODO: parse opts
|
||||||
|
stream, err := s.Client.Log(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// stream the records until nothing is left
|
||||||
|
var records []log.Record
|
||||||
|
for record := range stream {
|
||||||
|
records = append(records, record)
|
||||||
|
}
|
||||||
|
return records
|
||||||
|
}
|
||||||
|
|
||||||
|
// There is no write support
|
||||||
|
func (s *serviceLog) Write(r log.Record) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream log records
|
||||||
|
func (s *serviceLog) Stream() (<-chan log.Record, chan bool) {
|
||||||
|
stop := make(chan bool)
|
||||||
|
stream, err := s.Client.Log(log.Stream(true))
|
||||||
|
if err != nil {
|
||||||
|
// return a closed stream
|
||||||
|
deadStream := make(chan log.Record)
|
||||||
|
close(deadStream)
|
||||||
|
return deadStream, stop
|
||||||
|
}
|
||||||
|
|
||||||
|
newStream := make(chan log.Record, 128)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case rec := <-stream:
|
||||||
|
newStream <- rec
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return newStream, stop
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLog returns a new log interface
|
||||||
|
func NewLog(opts ...log.Option) log.Log {
|
||||||
|
var options log.Options
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
|
||||||
|
name := options.Name
|
||||||
|
|
||||||
|
// set the default name
|
||||||
|
if len(name) == 0 {
|
||||||
|
name = debug.DefaultName
|
||||||
|
}
|
||||||
|
|
||||||
|
return &serviceLog{
|
||||||
|
Client: NewClient(name),
|
||||||
|
}
|
||||||
|
}
|
@ -1,3 +1,4 @@
|
|||||||
|
// Package service provides the service log
|
||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -12,23 +13,23 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Debug provides debug service client
|
// Debug provides debug service client
|
||||||
type Debug struct {
|
type debugClient struct {
|
||||||
dbg pb.DebugService
|
Client pb.DebugService
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDebug provides Debug service implementation
|
// NewClient provides a debug client
|
||||||
func NewDebug(name string) *Debug {
|
func NewClient(name string) *debugClient {
|
||||||
// create default client
|
// create default client
|
||||||
cli := client.DefaultClient
|
cli := client.DefaultClient
|
||||||
|
|
||||||
return &Debug{
|
return &debugClient{
|
||||||
dbg: pb.NewDebugService(name, cli),
|
Client: pb.NewDebugService(name, cli),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logs queries the service logs and returns a channel to read the logs from
|
// Logs queries the service logs and returns a channel to read the logs from
|
||||||
func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
|
func (d *debugClient) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
|
||||||
options := log.ReadOptions{}
|
var options log.ReadOptions
|
||||||
// initialize the read options
|
// initialize the read options
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
@ -46,20 +47,21 @@ func (d *Debug) Log(opts ...log.ReadOption) (<-chan log.Record, error) {
|
|||||||
req.Stream = options.Stream
|
req.Stream = options.Stream
|
||||||
|
|
||||||
// get the log stream
|
// get the log stream
|
||||||
stream, err := d.dbg.Log(context.Background(), req)
|
stream, err := d.Client.Log(context.Background(), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed getting log stream: %s", err)
|
return nil, fmt.Errorf("failed getting log stream: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// log channel for streaming logs
|
// log channel for streaming logs
|
||||||
logChan := make(chan log.Record)
|
logChan := make(chan log.Record)
|
||||||
|
|
||||||
// go stream logs
|
// go stream logs
|
||||||
go d.streamLogs(logChan, stream)
|
go d.streamLogs(logChan, stream)
|
||||||
|
|
||||||
return logChan, nil
|
return logChan, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
|
func (d *debugClient) streamLogs(logChan chan log.Record, stream pb.Debug_LogService) {
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
package stats
|
package stats
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/micro/go-micro/debug/buffer"
|
"github.com/micro/go-micro/util/ring"
|
||||||
)
|
)
|
||||||
|
|
||||||
type stats struct {
|
type stats struct {
|
||||||
buffer *buffer.Buffer
|
buffer *ring.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stats) Read() ([]*Stat, error) {
|
func (s *stats) Read() ([]*Stat, error) {
|
||||||
@ -33,6 +33,6 @@ func (s *stats) Write(stat *Stat) error {
|
|||||||
// TODO add options
|
// TODO add options
|
||||||
func NewStats() Stats {
|
func NewStats() Stats {
|
||||||
return &stats{
|
return &stats{
|
||||||
buffer: buffer.New(1024),
|
buffer: ring.New(1024),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
// Package buffer provides a simple ring buffer for storing local data
|
// Package ring provides a simple ring buffer for storing local data
|
||||||
package buffer
|
package ring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
@ -8,18 +8,13 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type stream struct {
|
|
||||||
id string
|
|
||||||
entries chan *Entry
|
|
||||||
stop chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Buffer is ring buffer
|
// Buffer is ring buffer
|
||||||
type Buffer struct {
|
type Buffer struct {
|
||||||
size int
|
size int
|
||||||
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
vals []*Entry
|
vals []*Entry
|
||||||
streams map[string]stream
|
streams map[string]*Stream
|
||||||
}
|
}
|
||||||
|
|
||||||
// Entry is ring buffer data entry
|
// Entry is ring buffer data entry
|
||||||
@ -28,12 +23,14 @@ type Entry struct {
|
|||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new buffer of the given size
|
// Stream is used to stream the buffer
|
||||||
func New(i int) *Buffer {
|
type Stream struct {
|
||||||
return &Buffer{
|
// Id of the stream
|
||||||
size: i,
|
Id string
|
||||||
streams: make(map[string]stream),
|
// Buffered entries
|
||||||
}
|
Entries chan *Entry
|
||||||
|
// Stop channel
|
||||||
|
Stop chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put adds a new value to ring buffer
|
// Put adds a new value to ring buffer
|
||||||
@ -53,13 +50,13 @@ func (b *Buffer) Put(v interface{}) {
|
|||||||
b.vals = b.vals[1:]
|
b.vals = b.vals[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this is fucking ugly
|
// send to every stream
|
||||||
for _, stream := range b.streams {
|
for _, stream := range b.streams {
|
||||||
select {
|
select {
|
||||||
case <-stream.stop:
|
case <-stream.Stop:
|
||||||
delete(b.streams, stream.id)
|
delete(b.streams, stream.Id)
|
||||||
close(stream.entries)
|
close(stream.Entries)
|
||||||
case stream.entries <- entry:
|
case stream.Entries <- entry:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,22 +112,34 @@ func (b *Buffer) Since(t time.Time) []*Entry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stream logs from the buffer
|
// Stream logs from the buffer
|
||||||
func (b *Buffer) Stream(stop chan bool) <-chan *Entry {
|
// Close the channel when you want to stop
|
||||||
|
func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
|
||||||
b.Lock()
|
b.Lock()
|
||||||
defer b.Unlock()
|
defer b.Unlock()
|
||||||
|
|
||||||
entries := make(chan *Entry, 128)
|
entries := make(chan *Entry, 128)
|
||||||
id := uuid.New().String()
|
id := uuid.New().String()
|
||||||
b.streams[id] = stream{
|
stop := make(chan bool)
|
||||||
id: id,
|
|
||||||
entries: entries,
|
b.streams[id] = &Stream{
|
||||||
stop: stop,
|
Id: id,
|
||||||
|
Entries: entries,
|
||||||
|
Stop: stop,
|
||||||
}
|
}
|
||||||
|
|
||||||
return entries
|
return entries, stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the size of the ring buffer
|
// Size returns the size of the ring buffer
|
||||||
func (b *Buffer) Size() int {
|
func (b *Buffer) Size() int {
|
||||||
return b.size
|
return b.size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// New returns a new buffer of the given size
|
||||||
|
func New(i int) *Buffer {
|
||||||
|
return &Buffer{
|
||||||
|
size: i,
|
||||||
|
streams: make(map[string]*Stream),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package buffer
|
package ring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
Loading…
x
Reference in New Issue
Block a user