Small refactoring og logs

* log.Write now accepts log.Record
* we stream last 10 records first
* regenerate proto because of the above
This commit is contained in:
Milos Gajdos 2019-12-01 13:15:10 +00:00
parent ecdadef633
commit 4613a820ca
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
7 changed files with 98 additions and 88 deletions

View File

@ -49,7 +49,7 @@ func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.S
func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.Debug_LogsStream) error { func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.Debug_LogsStream) error {
var options []log.ReadOption var options []log.ReadOption
since := time.Unix(0, req.Since) since := time.Unix(req.Since, 0)
if !since.IsZero() { if !since.IsZero() {
options = append(options, log.Since(since)) options = append(options, log.Since(since))
} }
@ -63,11 +63,11 @@ func (d *Debug) Logs(ctx context.Context, req *proto.LogRequest, stream proto.De
stop := make(chan bool) stop := make(chan bool)
defer close(stop) defer close(stop)
// TODO: figure out how to close log stream // TODO: we need to figure out how to close ithe log stream
// It seems when the client disconnects, // It seems like when a client disconnects,
// the connection stays open until some timeout expires // the connection stays open until some timeout expires
// or something like that; that means the map of streams // or something like that; that means the map of streams
// might end up bloating if not cleaned up properly // might end up leaking memory if not cleaned up properly
records := d.log.Stream(stop) records := d.log.Stream(stop)
for record := range records { for record := range records {
if err := d.sendRecord(record, stream); err != nil { if err := d.sendRecord(record, stream); err != nil {
@ -96,13 +96,13 @@ func (d *Debug) sendRecord(record log.Record, stream proto.Debug_LogsStream) err
metadata[k] = v metadata[k] = v
} }
recLog := &proto.Log{ pbRecord := &proto.Record{
Timestamp: record.Timestamp.UnixNano(), Timestamp: record.Timestamp.Unix(),
Value: record.Value.(string), Value: record.Value.(string),
Metadata: metadata, Metadata: metadata,
} }
if err := stream.Send(recLog); err != nil { if err := stream.Send(pbRecord); err != nil {
return err return err
} }

View File

@ -33,9 +33,9 @@ func NewLog(opts ...Option) Log {
} }
// Write writes logs into logger // Write writes logs into logger
func (l *defaultLog) Write(v ...interface{}) { func (l *defaultLog) Write(r Record) {
golog.Print(v...) golog.Print(r.Value)
l.Buffer.Put(fmt.Sprint(v...)) l.Buffer.Put(fmt.Sprint(r.Value))
} }
// Read reads logs and returns them // Read reads logs and returns them
@ -85,12 +85,22 @@ func (l *defaultLog) Read(opts ...ReadOption) []Record {
func (l *defaultLog) Stream(stop chan bool) <-chan Record { func (l *defaultLog) Stream(stop chan bool) <-chan Record {
// get stream channel from ring buffer // get stream channel from ring buffer
stream := l.Buffer.Stream(stop) stream := l.Buffer.Stream(stop)
records := make(chan Record) // make a buffered channel
records := make(chan Record, 128)
fmt.Println("requested log stream") // get last 10 records
last10 := l.Buffer.Get(10)
// stream the log records // stream the log records
go func() { go func() {
// first send last 10 records
for _, entry := range last10 {
records <- Record{
Timestamp: entry.Timestamp,
Value: entry.Value,
Metadata: make(map[string]string),
}
}
// now stream continuously
for entry := range stream { for entry := range stream {
records <- Record{ records <- Record{
Timestamp: entry.Timestamp, Timestamp: entry.Timestamp,

View File

@ -20,9 +20,9 @@ var (
type Log interface { type Log interface {
// Read reads log entries from the logger // Read reads log entries from the logger
Read(...ReadOption) []Record Read(...ReadOption) []Record
// Write writes logs to logger // Write writes records to log
Write(...interface{}) Write(Record)
// Stream logs // Stream log records
Stream(chan bool) <-chan Record Stream(chan bool) <-chan Record
} }
@ -67,17 +67,17 @@ func init() {
func log(v ...interface{}) { func log(v ...interface{}) {
if len(prefix) > 0 { if len(prefix) > 0 {
DefaultLog.Write(fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)) DefaultLog.Write(Record{Value: fmt.Sprint(append([]interface{}{prefix, " "}, v...)...)})
return return
} }
DefaultLog.Write(fmt.Sprint(v...)) DefaultLog.Write(Record{Value: fmt.Sprint(v...)})
} }
func logf(format string, v ...interface{}) { func logf(format string, v ...interface{}) {
if len(prefix) > 0 { if len(prefix) > 0 {
format = prefix + " " + format format = prefix + " " + format
} }
DefaultLog.Write(fmt.Sprintf(format, v...)) DefaultLog.Write(Record{Value: fmt.Sprintf(format, v...)})
} }
// WithLevel logs with the level specified // WithLevel logs with the level specified

View File

@ -198,15 +198,15 @@ func (m *StatsResponse) GetGc() uint64 {
return 0 return 0
} }
// LogRequest queries service for logs // LogRequest requests service logs
type LogRequest struct { type LogRequest struct {
// count is the count of events // count of records to request
Count int64 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` Count int64 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
// relative time in seconds // relative time in seconds
// before the current time // before the current time
// from which to show logs // from which to show logs
Since int64 `protobuf:"varint,2,opt,name=since,proto3" json:"since,omitempty"` Since int64 `protobuf:"varint,2,opt,name=since,proto3" json:"since,omitempty"`
// stream logs continuously // stream records continuously
Stream bool `protobuf:"varint,3,opt,name=stream,proto3" json:"stream,omitempty"` Stream bool `protobuf:"varint,3,opt,name=stream,proto3" json:"stream,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
@ -259,59 +259,59 @@ func (m *LogRequest) GetStream() bool {
return false return false
} }
// Log is service log record // Record is service log record
type Log struct { type Record struct {
// timestamp of log event // timestamp of log record
Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// log value // record value
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
// metadata // record metadata
Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *Log) Reset() { *m = Log{} } func (m *Record) Reset() { *m = Record{} }
func (m *Log) String() string { return proto.CompactTextString(m) } func (m *Record) String() string { return proto.CompactTextString(m) }
func (*Log) ProtoMessage() {} func (*Record) ProtoMessage() {}
func (*Log) Descriptor() ([]byte, []int) { func (*Record) Descriptor() ([]byte, []int) {
return fileDescriptor_8d9d361be58531fb, []int{5} return fileDescriptor_8d9d361be58531fb, []int{5}
} }
func (m *Log) XXX_Unmarshal(b []byte) error { func (m *Record) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Log.Unmarshal(m, b) return xxx_messageInfo_Record.Unmarshal(m, b)
} }
func (m *Log) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Log.Marshal(b, m, deterministic) return xxx_messageInfo_Record.Marshal(b, m, deterministic)
} }
func (m *Log) XXX_Merge(src proto.Message) { func (m *Record) XXX_Merge(src proto.Message) {
xxx_messageInfo_Log.Merge(m, src) xxx_messageInfo_Record.Merge(m, src)
} }
func (m *Log) XXX_Size() int { func (m *Record) XXX_Size() int {
return xxx_messageInfo_Log.Size(m) return xxx_messageInfo_Record.Size(m)
} }
func (m *Log) XXX_DiscardUnknown() { func (m *Record) XXX_DiscardUnknown() {
xxx_messageInfo_Log.DiscardUnknown(m) xxx_messageInfo_Record.DiscardUnknown(m)
} }
var xxx_messageInfo_Log proto.InternalMessageInfo var xxx_messageInfo_Record proto.InternalMessageInfo
func (m *Log) GetTimestamp() int64 { func (m *Record) GetTimestamp() int64 {
if m != nil { if m != nil {
return m.Timestamp return m.Timestamp
} }
return 0 return 0
} }
func (m *Log) GetValue() string { func (m *Record) GetValue() string {
if m != nil { if m != nil {
return m.Value return m.Value
} }
return "" return ""
} }
func (m *Log) GetMetadata() map[string]string { func (m *Record) GetMetadata() map[string]string {
if m != nil { if m != nil {
return m.Metadata return m.Metadata
} }
@ -324,35 +324,35 @@ func init() {
proto.RegisterType((*StatsRequest)(nil), "StatsRequest") proto.RegisterType((*StatsRequest)(nil), "StatsRequest")
proto.RegisterType((*StatsResponse)(nil), "StatsResponse") proto.RegisterType((*StatsResponse)(nil), "StatsResponse")
proto.RegisterType((*LogRequest)(nil), "LogRequest") proto.RegisterType((*LogRequest)(nil), "LogRequest")
proto.RegisterType((*Log)(nil), "Log") proto.RegisterType((*Record)(nil), "Record")
proto.RegisterMapType((map[string]string)(nil), "Log.MetadataEntry") proto.RegisterMapType((map[string]string)(nil), "Record.MetadataEntry")
} }
func init() { proto.RegisterFile("debug.proto", fileDescriptor_8d9d361be58531fb) } func init() { proto.RegisterFile("debug.proto", fileDescriptor_8d9d361be58531fb) }
var fileDescriptor_8d9d361be58531fb = []byte{ var fileDescriptor_8d9d361be58531fb = []byte{
// 360 bytes of a gzipped FileDescriptorProto // 364 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x5f, 0x6b, 0xdb, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x41, 0x6b, 0xdb, 0x30,
0x14, 0xc5, 0xe3, 0x7f, 0x59, 0x72, 0x33, 0x3b, 0x43, 0x8c, 0x61, 0xcc, 0x06, 0x41, 0x4f, 0x86, 0x1c, 0xc5, 0x63, 0x3b, 0x76, 0x92, 0x7f, 0x66, 0x67, 0x88, 0x6d, 0x18, 0xb3, 0x43, 0xd0, 0xc9,
0x81, 0x18, 0xd9, 0xcb, 0xd8, 0x5e, 0x57, 0xe8, 0x83, 0x0b, 0x45, 0xfd, 0x04, 0x8a, 0x2d, 0x9c, 0x30, 0x10, 0x5b, 0x76, 0x19, 0xdb, 0x75, 0x85, 0x1e, 0x52, 0x28, 0xea, 0x27, 0x50, 0x6c, 0xe1,
0xd0, 0xd8, 0x72, 0xad, 0xeb, 0x42, 0x1e, 0xfa, 0xd4, 0xef, 0xd2, 0xcf, 0x59, 0x64, 0x29, 0x4d, 0x84, 0xc6, 0x96, 0x6b, 0xfd, 0x5d, 0xc8, 0xad, 0xd0, 0xaf, 0xd3, 0x0f, 0x59, 0x64, 0x29, 0x4d,
0x0d, 0x7d, 0xf3, 0xef, 0xc8, 0xf7, 0x1c, 0xf9, 0x1e, 0xc3, 0xaa, 0x92, 0xbb, 0xa1, 0x66, 0x5d, 0x0d, 0xbd, 0xf9, 0xf7, 0xe4, 0xff, 0x7b, 0x92, 0x9e, 0x60, 0x59, 0xca, 0x5d, 0x5f, 0xb1, 0xb6,
0xaf, 0x50, 0xd1, 0x35, 0xc4, 0xd7, 0x52, 0x1c, 0x71, 0xcf, 0xe5, 0xc3, 0x20, 0x35, 0xd2, 0x1c, 0x53, 0xa8, 0xe8, 0x0a, 0xe2, 0x6b, 0x29, 0x8e, 0xb8, 0xe7, 0xf2, 0xa1, 0x97, 0x1a, 0x69, 0x0e,
0x92, 0xb3, 0xa0, 0x3b, 0xd5, 0x6a, 0x49, 0xbe, 0xc1, 0x5c, 0xa3, 0xc0, 0x41, 0xa7, 0xde, 0xc6, 0xc9, 0x59, 0xd0, 0xad, 0x6a, 0xb4, 0x24, 0xdf, 0x20, 0xd2, 0x28, 0xb0, 0xd7, 0xa9, 0xb7, 0xf6,
0xcb, 0x97, 0xdc, 0x11, 0x4d, 0xe0, 0xf3, 0x1d, 0x0a, 0xd4, 0xe7, 0xc9, 0x67, 0x0f, 0x62, 0x27, 0xf2, 0x05, 0x77, 0x44, 0x13, 0xf8, 0x74, 0x87, 0x02, 0xf5, 0x79, 0xf2, 0xd9, 0x83, 0xd8, 0x09,
0xb8, 0xc9, 0x14, 0x3e, 0x69, 0x14, 0x3d, 0xca, 0x6a, 0x1c, 0x0d, 0xf9, 0x19, 0x8d, 0xe7, 0xd0, 0x6e, 0x32, 0x85, 0x99, 0x46, 0xd1, 0xa1, 0x2c, 0x87, 0xd1, 0x29, 0x3f, 0xa3, 0xf1, 0xec, 0x5b,
0xe1, 0xa1, 0x91, 0xa9, 0x3f, 0x1e, 0x38, 0x32, 0x7a, 0x23, 0x1b, 0xd5, 0x9f, 0xd2, 0xc0, 0xea, 0x3c, 0xd4, 0x32, 0xf5, 0x87, 0x05, 0x47, 0x46, 0xaf, 0x65, 0xad, 0xba, 0x53, 0x1a, 0x58, 0xdd,
0x96, 0x8c, 0x13, 0xee, 0x7b, 0x29, 0x2a, 0x9d, 0x86, 0xd6, 0xc9, 0x21, 0x49, 0xc0, 0xaf, 0xcb, 0x92, 0x71, 0xc2, 0x7d, 0x27, 0x45, 0xa9, 0xd3, 0xa9, 0x75, 0x72, 0x48, 0x12, 0xf0, 0xab, 0x22,
0x34, 0x1a, 0x45, 0xbf, 0x2e, 0xe9, 0x2d, 0x40, 0xa1, 0x6a, 0x77, 0x27, 0xf2, 0x15, 0xa2, 0x52, 0x0d, 0x07, 0xd1, 0xaf, 0x0a, 0x7a, 0x0b, 0xb0, 0x55, 0x95, 0xdb, 0x13, 0xf9, 0x02, 0x61, 0xa1,
0x0d, 0x2d, 0x8e, 0xf9, 0x01, 0xb7, 0x60, 0x54, 0x7d, 0x68, 0x4b, 0x1b, 0x1e, 0x70, 0x0b, 0xf6, 0xfa, 0x06, 0x87, 0xfc, 0x80, 0x5b, 0x30, 0xaa, 0x3e, 0x34, 0x85, 0x0d, 0x0f, 0xb8, 0x05, 0x7b,
0x3b, 0x7b, 0x29, 0x9a, 0x31, 0x7b, 0xc1, 0x1d, 0xd1, 0x17, 0x0f, 0x82, 0x42, 0xd5, 0xe4, 0x3b, 0xce, 0x4e, 0x8a, 0x7a, 0xc8, 0x9e, 0x73, 0x47, 0xf4, 0xc5, 0x83, 0x88, 0xcb, 0x42, 0x75, 0x25,
0x2c, 0xcd, 0x1d, 0x35, 0x8a, 0xa6, 0x73, 0x7e, 0x17, 0xc1, 0x78, 0x3e, 0x8a, 0xe3, 0x60, 0x3d, 0xf9, 0x0e, 0x0b, 0xb3, 0x4d, 0x8d, 0xa2, 0x6e, 0x9d, 0xe5, 0x45, 0x30, 0xb6, 0x8f, 0xe2, 0xd8,
0x97, 0xdc, 0x02, 0x61, 0xb0, 0x68, 0x24, 0x8a, 0x4a, 0xa0, 0x48, 0x83, 0x4d, 0x90, 0xaf, 0xb6, 0x5b, 0xdb, 0x05, 0xb7, 0x40, 0x7e, 0xc1, 0xbc, 0x96, 0x28, 0x4a, 0x81, 0x22, 0x0d, 0xd6, 0x41,
0x84, 0x15, 0xaa, 0x66, 0x37, 0x4e, 0xbc, 0x6a, 0xb1, 0x3f, 0xf1, 0xb7, 0x77, 0xb2, 0x7f, 0x10, 0xbe, 0xdc, 0x7c, 0x65, 0xd6, 0x8e, 0xdd, 0x38, 0xfd, 0xaa, 0xc1, 0xee, 0xc4, 0xdf, 0x7e, 0xcb,
0x4f, 0x8e, 0xc8, 0x17, 0x08, 0xee, 0xe5, 0xc9, 0x6d, 0xde, 0x3c, 0x7e, 0x1c, 0xf4, 0xd7, 0xff, 0xfe, 0x41, 0x3c, 0x5a, 0x22, 0x9f, 0x21, 0xb8, 0x97, 0x27, 0x77, 0xff, 0xe6, 0xf3, 0xe3, 0xac,
0xe3, 0x6d, 0x9f, 0x20, 0xfa, 0x6f, 0xaa, 0x25, 0x3f, 0x61, 0x6e, 0x3b, 0x24, 0x09, 0x9b, 0xb4, 0xbf, 0xfe, 0x1f, 0x6f, 0xf3, 0xe4, 0x41, 0xf8, 0xdf, 0x34, 0x4c, 0x7e, 0x40, 0x64, 0xab, 0x24,
0x9b, 0xad, 0xd9, 0xb4, 0x5c, 0x3a, 0x23, 0x39, 0x44, 0x63, 0x6b, 0x24, 0x66, 0xef, 0xeb, 0xcc, 0x09, 0x1b, 0x95, 0x9c, 0xad, 0xd8, 0xb8, 0x63, 0x3a, 0x21, 0x39, 0x84, 0x43, 0x79, 0x24, 0x66,
0x12, 0x36, 0x29, 0x93, 0xce, 0xc8, 0x0f, 0x08, 0x0b, 0x55, 0x6b, 0xb2, 0x62, 0x97, 0x0d, 0x67, 0xef, 0x5b, 0xcd, 0x12, 0x36, 0xea, 0x94, 0x4e, 0xc8, 0x1a, 0xa6, 0x5b, 0x55, 0x69, 0xb2, 0x64,
0xa1, 0x01, 0x3a, 0xfb, 0xe5, 0xed, 0xe6, 0xe3, 0x1f, 0xf5, 0xfb, 0x35, 0x00, 0x00, 0xff, 0xff, 0x97, 0x8b, 0xce, 0x66, 0xee, 0x4c, 0x74, 0xf2, 0xd3, 0xdb, 0x45, 0xc3, 0xdb, 0xfa, 0xfd, 0x1a,
0x95, 0x24, 0xad, 0xbc, 0x60, 0x02, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xea, 0x2d, 0x15, 0xdb, 0x6a, 0x02, 0x00, 0x00,
} }

View File

@ -93,7 +93,7 @@ type Debug_LogsService interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
Recv() (*Log, error) Recv() (*Record, error)
} }
type debugServiceLogs struct { type debugServiceLogs struct {
@ -112,8 +112,8 @@ func (x *debugServiceLogs) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *debugServiceLogs) Recv() (*Log, error) { func (x *debugServiceLogs) Recv() (*Record, error) {
m := new(Log) m := new(Record)
err := x.stream.Recv(m) err := x.stream.Recv(m)
if err != nil { if err != nil {
return nil, err return nil, err
@ -166,7 +166,7 @@ type Debug_LogsStream interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
Send(*Log) error Send(*Record) error
} }
type debugLogsStream struct { type debugLogsStream struct {
@ -185,6 +185,6 @@ func (x *debugLogsStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *debugLogsStream) Send(m *Log) error { func (x *debugLogsStream) Send(m *Record) error {
return x.stream.Send(m) return x.stream.Send(m)
} }

View File

@ -1,9 +1,9 @@
syntax = "proto3"; syntax = "proto3";
service Debug { service Debug {
rpc Health(HealthRequest) returns (HealthResponse) {}; rpc Health(HealthRequest) returns (HealthResponse) {};
rpc Stats(StatsRequest) returns (StatsResponse) {}; rpc Stats(StatsRequest) returns (StatsResponse) {};
rpc Logs(LogRequest) returns (stream Log) {}; rpc Logs(LogRequest) returns (stream Record) {};
} }
message HealthRequest {} message HealthRequest {}
@ -28,24 +28,24 @@ message StatsResponse {
uint64 gc = 5; uint64 gc = 5;
} }
// LogRequest queries service for logs // LogRequest requests service logs
message LogRequest { message LogRequest {
// count is the count of events // count of records to request
int64 count = 1; int64 count = 1;
// relative time in seconds // relative time in seconds
// before the current time // before the current time
// from which to show logs // from which to show logs
int64 since = 2; int64 since = 2;
// stream logs continuously // stream records continuously
bool stream = 3; bool stream = 3;
} }
// Log is service log record // Record is service log record
message Log { message Record {
// timestamp of log event // timestamp of log record
int64 timestamp = 1; int64 timestamp = 1;
// log value // record value
string value = 2; string value = 2;
// metadata // record metadata
map<string,string> metadata = 3; map<string,string> metadata = 3;
} }

View File

@ -74,7 +74,7 @@ func (d *Debug) streamLogs(logChan chan log.Record, stream pb.Debug_LogsService)
} }
record := log.Record{ record := log.Record{
Timestamp: time.Unix(0, resp.Timestamp), Timestamp: time.Unix(resp.Timestamp, 0),
Value: resp.Value, Value: resp.Value,
Metadata: metadata, Metadata: metadata,
} }