Simplified Logs RPC. Cleaned up code. Added comments.
This commit is contained in:
		| @@ -6,12 +6,14 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // Buffer is ring buffer | ||||||
| type Buffer struct { | type Buffer struct { | ||||||
| 	size int | 	size int | ||||||
| 	sync.RWMutex | 	sync.RWMutex | ||||||
| 	vals []*Entry | 	vals []*Entry | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Entry is ring buffer data entry | ||||||
| type Entry struct { | type Entry struct { | ||||||
| 	Value     interface{} | 	Value     interface{} | ||||||
| 	Timestamp time.Time | 	Timestamp time.Time | ||||||
| @@ -43,14 +45,14 @@ func (b *Buffer) Put(v interface{}) { | |||||||
|  |  | ||||||
| // Get returns the last n entries | // Get returns the last n entries | ||||||
| func (b *Buffer) Get(n int) []*Entry { | func (b *Buffer) Get(n int) []*Entry { | ||||||
|  | 	b.RLock() | ||||||
|  | 	defer b.RUnlock() | ||||||
|  |  | ||||||
| 	// reset any invalid values | 	// reset any invalid values | ||||||
| 	if n > b.size || n < 0 { | 	if n > b.size || n < 0 { | ||||||
| 		n = b.size | 		n = b.size | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	b.RLock() |  | ||||||
| 	defer b.RUnlock() |  | ||||||
|  |  | ||||||
| 	// create a delta | 	// create a delta | ||||||
| 	delta := b.size - n | 	delta := b.size - n | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  | // Pacjage handler implements service debug handler | ||||||
| package handler | package handler | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| @@ -46,15 +47,23 @@ func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.S | |||||||
| } | } | ||||||
|  |  | ||||||
| func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.Debug_LogsStream) error { | func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.Debug_LogsStream) error { | ||||||
| 	var records []log.Record | 	var options []log.ReadOption | ||||||
|  |  | ||||||
| 	since := time.Unix(0, req.Since) | 	since := time.Unix(0, req.Since) | ||||||
| 	if !since.IsZero() { | 	if !since.IsZero() { | ||||||
| 		records = d.log.Read(log.Since(since)) | 		options = append(options, log.Since(since)) | ||||||
| 	} else { |  | ||||||
| 		records = d.log.Read(log.Count(int(req.Count))) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// TODO: figure out the stream later on | 	count := int(req.Count) | ||||||
|  | 	if count > 0 { | ||||||
|  | 		options = append(options, log.Count(count)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// get the log records | ||||||
|  | 	records := d.log.Read(options...) | ||||||
|  |  | ||||||
|  | 	// TODO: figure out the stream | ||||||
|  |  | ||||||
| 	for _, record := range records { | 	for _, record := range records { | ||||||
| 		metadata := make(map[string]string) | 		metadata := make(map[string]string) | ||||||
| 		for k, v := range record.Metadata { | 		for k, v := range record.Metadata { | ||||||
|   | |||||||
| @@ -50,13 +50,24 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { | |||||||
| 	// if Since options ha sbeen specified we honor it | 	// if Since options ha sbeen specified we honor it | ||||||
| 	if !options.Since.IsZero() { | 	if !options.Since.IsZero() { | ||||||
| 		entries = l.Buffer.Since(options.Since) | 		entries = l.Buffer.Since(options.Since) | ||||||
| 	} else { |  | ||||||
| 		// otherwie return last count entries |  | ||||||
| 		entries = l.Buffer.Get(options.Count) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// TODO: if both Since and Count are set should we return? | 	// only if we specified valid count constraint | ||||||
| 	// last Count from the returned time scoped entries? | 	// do we do some serious if-else kung-fu | ||||||
|  | 	// if since has been given we return *count* number of | ||||||
|  | 	// logs since the requested timestamp; | ||||||
|  | 	// otherwise we retourn last count number of logs | ||||||
|  | 	if options.Count > 0 { | ||||||
|  | 		switch len(entries) > 0 { | ||||||
|  | 		case true: | ||||||
|  | 			// if we request fewer logs than what since constraint gives us | ||||||
|  | 			if options.Count < len(entries) { | ||||||
|  | 				entries = entries[0:options.Count] | ||||||
|  | 			} | ||||||
|  | 		default: | ||||||
|  | 			entries = l.Buffer.Get(options.Count) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	records := make([]Record, 0, len(entries)) | 	records := make([]Record, 0, len(entries)) | ||||||
| 	for _, entry := range entries { | 	for _, entry := range entries { | ||||||
| @@ -66,5 +77,6 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record { | |||||||
| 		} | 		} | ||||||
| 		records = append(records, record) | 		records = append(records, record) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return records | 	return records | ||||||
| } | } | ||||||
|   | |||||||
| @@ -9,21 +9,21 @@ func TestLogger(t *testing.T) { | |||||||
| 	// set size to some value | 	// set size to some value | ||||||
| 	size := 100 | 	size := 100 | ||||||
| 	// override the global logger | 	// override the global logger | ||||||
| 	logger = NewLog(Size(size)) | 	DefaultLog = NewLog(Size(size)) | ||||||
| 	// make sure we have the right size of the logger ring buffer | 	// make sure we have the right size of the logger ring buffer | ||||||
| 	if logger.(*defaultLog).Size() != size { | 	if DefaultLog.(*defaultLog).Size() != size { | ||||||
| 		t.Errorf("expected buffer size: %d, got: %d", size, logger.(*defaultLog).Size()) | 		t.Errorf("expected buffer size: %d, got: %d", size, DefaultLog.(*defaultLog).Size()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Log some cruft | 	// Log some cruft | ||||||
| 	Info("foobar") | 	Info("foobar") | ||||||
| 	// increase the log level | 	// increase the log level | ||||||
| 	level = LevelDebug | 	DefaultLevel = LevelDebug | ||||||
| 	Debugf("foo %s", "bar") | 	Debugf("foo %s", "bar") | ||||||
|  |  | ||||||
| 	// Check if the logs are stored in the logger ring buffer | 	// Check if the logs are stored in the logger ring buffer | ||||||
| 	expected := []string{"foobar", "foo bar"} | 	expected := []string{"foobar", "foo bar"} | ||||||
| 	entries := logger.Read(Count(len(expected))) | 	entries := DefaultLog.Read(Count(len(expected))) | ||||||
| 	for i, entry := range entries { | 	for i, entry := range entries { | ||||||
| 		if !reflect.DeepEqual(entry.Value, expected[i]) { | 		if !reflect.DeepEqual(entry.Value, expected[i]) { | ||||||
| 			t.Errorf("expected %s, got %s", expected[i], entry.Value) | 			t.Errorf("expected %s, got %s", expected[i], entry.Value) | ||||||
|   | |||||||
| @@ -31,6 +31,8 @@ type ReadOptions struct { | |||||||
| 	Since time.Time | 	Since time.Time | ||||||
| 	// Count specifies number of logs to return | 	// Count specifies number of logs to return | ||||||
| 	Count int | 	Count int | ||||||
|  | 	// Stream requests continuous log stream | ||||||
|  | 	Stream bool | ||||||
| } | } | ||||||
|  |  | ||||||
| // ReadOption used for reading the logs | // ReadOption used for reading the logs | ||||||
| @@ -49,3 +51,10 @@ func Count(c int) ReadOption { | |||||||
| 		o.Count = c | 		o.Count = c | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Stream requests continuous log stream | ||||||
|  | func Stream(s bool) ReadOption { | ||||||
|  | 	return func(o *ReadOptions) { | ||||||
|  | 		o.Stream = s | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -26,6 +26,7 @@ func NewDebug(name string) *Debug { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Logs queries the service logs and returns a channel to read the logs from | ||||||
| func (d *Debug) Logs(opts ...log.ReadOption) (<-chan log.Record, error) { | func (d *Debug) Logs(opts ...log.ReadOption) (<-chan log.Record, error) { | ||||||
| 	options := log.ReadOptions{} | 	options := log.ReadOptions{} | ||||||
| 	// initialize the read options | 	// initialize the read options | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user