add requests/errors to stats

This commit is contained in:
Asim Aslam 2019-12-18 18:36:42 +00:00
parent d4bec24eb7
commit d52a111735
8 changed files with 197 additions and 59 deletions

View File

@ -15,6 +15,8 @@ import (
// Should stream from OS // Should stream from OS
type osLog struct { type osLog struct {
once sync.Once
sync.RWMutex sync.RWMutex
buffer *ring.Buffer buffer *ring.Buffer
subs map[string]*osStream subs map[string]*osStream
@ -118,6 +120,10 @@ func (o *osLog) run() {
// Read reads log entries from the logger // Read reads log entries from the logger
func (o *osLog) Read(...ReadOption) ([]Record, error) { func (o *osLog) Read(...ReadOption) ([]Record, error) {
o.once.Do(func() {
go o.run()
})
var records []Record var records []Record
// read the last 100 records // read the last 100 records
@ -130,6 +136,10 @@ func (o *osLog) Read(...ReadOption) ([]Record, error) {
// Write writes records to log // Write writes records to log
func (o *osLog) Write(r Record) error { func (o *osLog) Write(r Record) error {
o.once.Do(func() {
go o.run()
})
b, _ := json.Marshal(r) b, _ := json.Marshal(r)
_, err := os.Stderr.Write(append(b, byte('\n'))) _, err := os.Stderr.Write(append(b, byte('\n')))
return err return err
@ -137,6 +147,10 @@ func (o *osLog) Write(r Record) error {
// Stream log records // Stream log records
func (o *osLog) Stream() (Stream, error) { func (o *osLog) Stream() (Stream, error) {
o.once.Do(func() {
go o.run()
})
o.Lock() o.Lock()
defer o.Unlock() defer o.Unlock()
@ -172,7 +186,5 @@ func NewLog(opts ...Option) Log {
subs: make(map[string]*osStream), subs: make(map[string]*osStream),
} }
go l.run()
return l return l
} }

View File

@ -3,11 +3,11 @@ package handler
import ( import (
"context" "context"
"runtime"
"time" "time"
"github.com/micro/go-micro/debug/log" "github.com/micro/go-micro/debug/log"
proto "github.com/micro/go-micro/debug/service/proto" proto "github.com/micro/go-micro/debug/service/proto"
"github.com/micro/go-micro/debug/stats"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )
@ -17,15 +17,18 @@ var (
) )
type Debug struct { type Debug struct {
started int64 // must honour the debug handler
proto.DebugHandler proto.DebugHandler
// the logger for retrieving logs
log log.Log log log.Log
// the stats collector
stats stats.Stats
} }
func newDebug() *Debug { func newDebug() *Debug {
return &Debug{ return &Debug{
started: time.Now().Unix(), log: log.DefaultLog,
log: log.DefaultLog, stats: stats.DefaultStats,
} }
} }
@ -35,15 +38,25 @@ func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto
} }
func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error { func (d *Debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error {
var mstat runtime.MemStats stats, err := d.stats.Read()
runtime.ReadMemStats(&mstat) if err != nil {
return err
}
if len(stats) == 0 {
return nil
}
// write the response values
rsp.Timestamp = uint64(stats[0].Timestamp)
rsp.Started = uint64(stats[0].Started)
rsp.Uptime = uint64(stats[0].Uptime)
rsp.Memory = stats[0].Memory
rsp.Gc = stats[0].GC
rsp.Threads = stats[0].Threads
rsp.Requests = stats[0].Requests
rsp.Errors = stats[0].Errors
rsp.Timestamp = uint64(time.Now().Unix())
rsp.Started = uint64(d.started)
rsp.Uptime = uint64(time.Now().Unix() - d.started)
rsp.Memory = mstat.Alloc
rsp.Gc = mstat.PauseTotalNs
rsp.Threads = uint64(runtime.NumGoroutine())
return nil return nil
} }
@ -78,7 +91,17 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
defer lgStream.Stop() defer lgStream.Stop()
for record := range lgStream.Chan() { for record := range lgStream.Chan() {
if err := d.sendRecord(record, stream); err != nil { // copy metadata
metadata := make(map[string]string)
for k, v := range record.Metadata {
metadata[k] = v
}
// send record
if err := stream.Send(&proto.Record{
Timestamp: record.Timestamp.Unix(),
Message: record.Message.(string),
Metadata: metadata,
}); err != nil {
return err return err
} }
} }
@ -95,23 +118,20 @@ func (d *Debug) Log(ctx context.Context, stream server.Stream) error {
// send all the logs downstream // send all the logs downstream
for _, record := range records { for _, record := range records {
if err := d.sendRecord(record, stream); err != nil { // copy metadata
metadata := make(map[string]string)
for k, v := range record.Metadata {
metadata[k] = v
}
// send record
if err := stream.Send(&proto.Record{
Timestamp: record.Timestamp.Unix(),
Message: record.Message.(string),
Metadata: metadata,
}); err != nil {
return err return err
} }
} }
return nil return nil
} }
func (d *Debug) sendRecord(record log.Record, stream server.Stream) error {
metadata := make(map[string]string)
for k, v := range record.Metadata {
metadata[k] = v
}
return stream.Send(&proto.Record{
Timestamp: record.Timestamp.Unix(),
Message: record.Message.(string),
Metadata: metadata,
})
}

View File

@ -152,7 +152,11 @@ type StatsResponse struct {
// num threads // num threads
Threads uint64 `protobuf:"varint,5,opt,name=threads,proto3" json:"threads,omitempty"` Threads uint64 `protobuf:"varint,5,opt,name=threads,proto3" json:"threads,omitempty"`
// total gc in nanoseconds // total gc in nanoseconds
Gc uint64 `protobuf:"varint,6,opt,name=gc,proto3" json:"gc,omitempty"` Gc uint64 `protobuf:"varint,6,opt,name=gc,proto3" json:"gc,omitempty"`
// total number of requests
Requests uint64 `protobuf:"varint,7,opt,name=requests,proto3" json:"requests,omitempty"`
// total number of errors
Errors uint64 `protobuf:"varint,8,opt,name=errors,proto3" json:"errors,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -225,6 +229,20 @@ func (m *StatsResponse) GetGc() uint64 {
return 0 return 0
} }
func (m *StatsResponse) GetRequests() uint64 {
if m != nil {
return m.Requests
}
return 0
}
func (m *StatsResponse) GetErrors() uint64 {
if m != nil {
return m.Errors
}
return 0
}
// LogRequest requests service logs // LogRequest requests service logs
type LogRequest struct { type LogRequest struct {
// service to request logs for // service to request logs for
@ -369,31 +387,32 @@ func init() {
} }
var fileDescriptor_dea322649cde1ef2 = []byte{ var fileDescriptor_dea322649cde1ef2 = []byte{
// 407 bytes of a gzipped FileDescriptorProto // 427 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xcf, 0x6e, 0x13, 0x31, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xcd, 0x6e, 0x13, 0x31,
0x10, 0xc6, 0xb3, 0xbb, 0xcd, 0xb6, 0x99, 0x92, 0x80, 0x2c, 0x40, 0x56, 0x84, 0x44, 0xe5, 0xd3, 0x14, 0x85, 0x33, 0x33, 0xcd, 0x24, 0xb9, 0x25, 0x01, 0x59, 0x80, 0xac, 0x11, 0x12, 0x95, 0x57,
0x22, 0x84, 0x03, 0xe5, 0x82, 0xe0, 0x0a, 0x12, 0x87, 0x72, 0x31, 0x4f, 0xe0, 0xee, 0x8e, 0xb6, 0x83, 0x10, 0x0e, 0x94, 0x0d, 0x82, 0x2d, 0x48, 0x2c, 0xca, 0xc6, 0x3c, 0x81, 0x3b, 0x73, 0x35,
0x81, 0x3a, 0x0e, 0xf6, 0x6c, 0xa5, 0x9c, 0x78, 0x16, 0xde, 0x80, 0x47, 0x44, 0xfe, 0xb3, 0xb4, 0x0d, 0xd4, 0x71, 0xb0, 0xef, 0x54, 0xca, 0x8a, 0x57, 0xe2, 0x65, 0x78, 0x1f, 0xe4, 0x9f, 0xb4,
0xab, 0x1e, 0x7a, 0xf3, 0xef, 0xf3, 0xe7, 0xd1, 0xcc, 0xf8, 0x03, 0x69, 0xb6, 0xad, 0xb3, 0x9b, 0x8d, 0x58, 0x74, 0xe7, 0xef, 0xcc, 0xf5, 0x91, 0xcf, 0x9d, 0x03, 0xd2, 0x6c, 0x3a, 0x67, 0xd7,
0xde, 0xbe, 0x49, 0x87, 0x0e, 0x2f, 0x87, 0x7e, 0xe3, 0xd1, 0xdd, 0x6c, 0x5b, 0xdc, 0xec, 0x9d, 0x83, 0x7d, 0x93, 0x0e, 0x3d, 0x5e, 0x8e, 0xc3, 0xda, 0xa3, 0xbb, 0xd9, 0x74, 0xb8, 0xde, 0x39,
0xa5, 0xac, 0xc9, 0x78, 0x16, 0xaf, 0x60, 0xf9, 0x15, 0xf5, 0x35, 0x5d, 0x29, 0xfc, 0x35, 0xa0, 0x4b, 0x59, 0x93, 0xf1, 0x2c, 0x5e, 0xc1, 0xf2, 0x2b, 0xea, 0x6b, 0xba, 0x52, 0xf8, 0x6b, 0x44,
0x27, 0xc6, 0xe1, 0x38, 0xbb, 0x79, 0x71, 0x56, 0x34, 0x0b, 0x35, 0xa2, 0x68, 0x60, 0x35, 0x5a, 0x4f, 0x8c, 0xc3, 0x2c, 0x4f, 0xf3, 0xe2, 0xac, 0x68, 0x17, 0xea, 0x80, 0xa2, 0x85, 0xd5, 0x61,
0xfd, 0xde, 0xee, 0x3c, 0xb2, 0xe7, 0x50, 0x7b, 0xd2, 0x34, 0xf8, 0x6c, 0xcd, 0x24, 0x1a, 0x78, 0xd4, 0xef, 0xec, 0xd6, 0x23, 0x7b, 0x0e, 0xb5, 0x27, 0x4d, 0xa3, 0xcf, 0xa3, 0x99, 0x44, 0x0b,
0xf4, 0x9d, 0x34, 0xf9, 0x87, 0x6b, 0xfe, 0x29, 0x60, 0x99, 0xad, 0xb9, 0xe6, 0x0b, 0x58, 0xd0, 0x8f, 0xbe, 0x93, 0x26, 0xff, 0xb0, 0xe7, 0xdf, 0x02, 0x96, 0x79, 0x34, 0x7b, 0xbe, 0x80, 0x05,
0xd6, 0xa0, 0x27, 0x6d, 0xf6, 0xd1, 0x7d, 0xa4, 0x6e, 0x85, 0x58, 0x89, 0xb4, 0x23, 0xec, 0x78, 0x6d, 0x0c, 0x7a, 0xd2, 0x66, 0x17, 0xa7, 0x4f, 0xd4, 0x9d, 0x10, 0x9d, 0x48, 0x3b, 0xc2, 0x9e,
0x19, 0xef, 0x46, 0x0c, 0xbd, 0x0c, 0xfb, 0x60, 0xe4, 0x55, 0xbc, 0xc8, 0x14, 0x74, 0x83, 0xc6, 0x97, 0xf1, 0xdb, 0x01, 0xc3, 0x5b, 0xc6, 0x5d, 0x18, 0xe4, 0x55, 0xfc, 0x90, 0x29, 0xe8, 0x06,
0xba, 0x03, 0x3f, 0x4a, 0x7a, 0xa2, 0x50, 0x89, 0xae, 0x1c, 0xea, 0xce, 0xf3, 0x79, 0xaa, 0x94, 0x8d, 0x75, 0x7b, 0x7e, 0x92, 0xf4, 0x44, 0xc1, 0x89, 0xae, 0x1c, 0xea, 0xde, 0xf3, 0x69, 0x72,
0x91, 0xad, 0xa0, 0xec, 0x5b, 0x5e, 0x47, 0xb1, 0xec, 0x5b, 0xf1, 0x03, 0xe0, 0xc2, 0xf6, 0x0f, 0xca, 0xc8, 0x56, 0x50, 0x0e, 0x1d, 0xaf, 0xa3, 0x58, 0x0e, 0x1d, 0x6b, 0x60, 0xee, 0x52, 0x10,
0xce, 0x92, 0xb6, 0xe1, 0x50, 0x9b, 0xd8, 0xda, 0x89, 0xca, 0xc4, 0x9e, 0xc2, 0xbc, 0xb5, 0xc3, 0xcf, 0x67, 0x51, 0xbd, 0xe5, 0xe0, 0x8e, 0xce, 0x59, 0xe7, 0xf9, 0x3c, 0xb9, 0x27, 0x12, 0x3f,
0x8e, 0x62, 0x63, 0x95, 0x4a, 0x10, 0x54, 0xbf, 0xdd, 0xb5, 0x18, 0xdb, 0xaa, 0x54, 0x02, 0xf1, 0x00, 0x2e, 0xec, 0xf0, 0x60, 0xfe, 0xb4, 0x41, 0x87, 0xda, 0xc4, 0x38, 0x73, 0x95, 0x89, 0x3d,
0xb7, 0x80, 0x5a, 0x61, 0x6b, 0x5d, 0x77, 0x7f, 0x11, 0xd5, 0xdd, 0x45, 0xbc, 0x83, 0x13, 0x83, 0x85, 0x69, 0x67, 0xc7, 0x2d, 0xc5, 0x30, 0x95, 0x4a, 0x10, 0x54, 0xbf, 0xd9, 0x76, 0x18, 0xa3,
0xa4, 0x3b, 0x4d, 0x9a, 0x97, 0x67, 0x55, 0x73, 0x7a, 0xfe, 0x4c, 0xa6, 0x87, 0xf2, 0x5b, 0xd6, 0x54, 0x2a, 0x81, 0xf8, 0x53, 0x40, 0xad, 0xb0, 0xb3, 0xae, 0xff, 0x7f, 0x79, 0xd5, 0xfd, 0xe5,
0xbf, 0xec, 0xc8, 0x1d, 0xd4, 0x7f, 0x5b, 0xe8, 0xdc, 0xa0, 0xf7, 0xba, 0x4f, 0x2b, 0x5a, 0xa8, 0xbd, 0x83, 0xb9, 0x41, 0xd2, 0xbd, 0x26, 0xcd, 0xcb, 0xb3, 0xaa, 0x3d, 0x3d, 0x7f, 0x26, 0xd3,
0x11, 0xd7, 0x9f, 0x60, 0x39, 0x79, 0xc4, 0x9e, 0x40, 0xf5, 0x13, 0x0f, 0x79, 0xc0, 0x70, 0x0c, 0x45, 0xf9, 0x2d, 0xeb, 0x5f, 0xb6, 0xe4, 0xf6, 0xea, 0x76, 0x2c, 0xbc, 0xdc, 0xa0, 0xf7, 0x7a,
0xed, 0xde, 0xe8, 0xeb, 0x01, 0xe3, 0x6c, 0x0b, 0x95, 0xe0, 0x63, 0xf9, 0xa1, 0x38, 0xff, 0x0d, 0x48, 0x6b, 0x5d, 0xa8, 0x03, 0x36, 0x9f, 0x60, 0x79, 0x74, 0x89, 0x3d, 0x81, 0xea, 0x27, 0xee,
0xf3, 0xcf, 0x21, 0x50, 0xec, 0x35, 0xd4, 0x29, 0x1f, 0x6c, 0x25, 0x27, 0x99, 0x5a, 0x3f, 0x96, 0x73, 0xc0, 0x70, 0x0c, 0xcf, 0xbd, 0xd1, 0xd7, 0x23, 0xc6, 0x6c, 0x0b, 0x95, 0xe0, 0x63, 0xf9,
0xd3, 0xe0, 0x88, 0x19, 0x6b, 0x60, 0x1e, 0xff, 0x9d, 0x2d, 0xe5, 0xdd, 0xa8, 0xac, 0x57, 0x72, 0xa1, 0x38, 0xff, 0x0d, 0xd3, 0xcf, 0xa1, 0x84, 0xec, 0x35, 0xd4, 0xa9, 0x53, 0x6c, 0x25, 0x8f,
0x12, 0x07, 0x31, 0x63, 0x2f, 0xa1, 0xba, 0xb0, 0x3d, 0x3b, 0x95, 0xb7, 0x9f, 0xb0, 0x3e, 0xce, 0x7a, 0xd8, 0x3c, 0x96, 0xc7, 0x65, 0x13, 0x13, 0xd6, 0xc2, 0x34, 0x76, 0x85, 0x2d, 0xe5, 0xfd,
0xb3, 0x8a, 0xd9, 0xdb, 0xe2, 0xb2, 0x8e, 0x49, 0x7e, 0xff, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x4e, 0x7a, 0x35, 0x2b, 0x79, 0x54, 0x21, 0x31, 0x61, 0x2f, 0xa1, 0xba, 0xb0, 0x03, 0x3b, 0x95, 0x77,
0xd3, 0x4c, 0xce, 0xfb, 0x02, 0x00, 0x00, 0x3f, 0xa1, 0x99, 0xe5, 0xac, 0x62, 0xf2, 0xb6, 0xb8, 0xac, 0x63, 0xfb, 0xdf, 0xff, 0x0b, 0x00,
0x00, 0xff, 0xff, 0x69, 0xc0, 0x33, 0x21, 0x2f, 0x03, 0x00, 0x00,
} }

View File

@ -34,6 +34,10 @@ message StatsResponse {
uint64 threads = 5; uint64 threads = 5;
// total gc in nanoseconds // total gc in nanoseconds
uint64 gc = 6; uint64 gc = 6;
// total number of requests
uint64 requests = 7;
// total number of errors
uint64 errors = 8;
} }
// LogRequest requests service logs // LogRequest requests service logs

View File

@ -1,18 +1,50 @@
package stats package stats
import ( import (
"runtime"
"sync"
"time"
"github.com/micro/go-micro/util/ring" "github.com/micro/go-micro/util/ring"
) )
type stats struct { type stats struct {
// used to store past stats
buffer *ring.Buffer buffer *ring.Buffer
sync.RWMutex
started int64
requests uint64
errors uint64
}
func (s *stats) snapshot() *Stat {
s.RLock()
defer s.RUnlock()
var mstat runtime.MemStats
runtime.ReadMemStats(&mstat)
now := time.Now().Unix()
return &Stat{
Timestamp: now,
Started: s.started,
Uptime: now - s.started,
Memory: mstat.Alloc,
GC: mstat.PauseTotalNs,
Threads: uint64(runtime.NumGoroutine()),
Requests: s.requests,
Errors: s.errors,
}
} }
func (s *stats) Read() ([]*Stat, error) { func (s *stats) Read() ([]*Stat, error) {
// TODO adjustable size and optional read values // TODO adjustable size and optional read values
buf := s.buffer.Get(1) buf := s.buffer.Get(60)
var stats []*Stat var stats []*Stat
// get a value from the buffer if it exists
for _, b := range buf { for _, b := range buf {
stat, ok := b.Value.(*Stat) stat, ok := b.Value.(*Stat)
if !ok { if !ok {
@ -21,6 +53,9 @@ func (s *stats) Read() ([]*Stat, error) {
stats = append(stats, stat) stats = append(stats, stat)
} }
// get a snapshot
stats = append(stats, s.snapshot())
return stats, nil return stats, nil
} }
@ -29,10 +64,26 @@ func (s *stats) Write(stat *Stat) error {
return nil return nil
} }
func (s *stats) Record(err error) error {
s.Lock()
defer s.Unlock()
// increment the total request count
s.requests++
// increment the error count
if err != nil {
s.errors++
}
return nil
}
// NewStats returns a new in memory stats buffer // NewStats returns a new in memory stats buffer
// TODO add options // TODO add options
func NewStats() Stats { func NewStats() Stats {
return &stats{ return &stats{
buffer: ring.New(1024), started: time.Now().Unix(),
buffer: ring.New(60),
} }
} }

View File

@ -7,6 +7,8 @@ type Stats interface {
Read() ([]*Stat, error) Read() ([]*Stat, error)
// Write a stat snapshot // Write a stat snapshot
Write(*Stat) error Write(*Stat) error
// Record a request
Record(error) error
} }
// A runtime stat // A runtime stat
@ -23,4 +25,12 @@ type Stat struct {
Threads uint64 Threads uint64
// Garbage collection in nanoseconds // Garbage collection in nanoseconds
GC uint64 GC uint64
// Total requests
Requests uint64
// Total errors
Errors uint64
} }
var (
DefaultStats = NewStats()
)

View File

@ -14,6 +14,7 @@ import (
"github.com/micro/go-micro/debug/profile/http" "github.com/micro/go-micro/debug/profile/http"
"github.com/micro/go-micro/debug/profile/pprof" "github.com/micro/go-micro/debug/profile/pprof"
"github.com/micro/go-micro/debug/service/handler" "github.com/micro/go-micro/debug/service/handler"
"github.com/micro/go-micro/debug/stats"
"github.com/micro/go-micro/plugin" "github.com/micro/go-micro/plugin"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/util/log" "github.com/micro/go-micro/util/log"
@ -35,6 +36,9 @@ func newService(opts ...Option) Service {
// wrap client to inject From-Service header on any calls // wrap client to inject From-Service header on any calls
options.Client = wrapper.FromService(serviceName, options.Client) options.Client = wrapper.FromService(serviceName, options.Client)
// wrap the server to provide handler stats
options.Server.Init(server.WrapHandler(wrapper.HandlerStats(stats.DefaultStats)))
return &service{ return &service{
opts: options, opts: options,
} }

View File

@ -4,7 +4,9 @@ import (
"context" "context"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/debug/stats"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/server"
) )
type clientWrapper struct { type clientWrapper struct {
@ -55,3 +57,19 @@ func FromService(name string, c client.Client) client.Client {
}, },
} }
} }
// HandlerStats wraps a server handler to generate request/error stats
func HandlerStats(stats stats.Stats) server.HandlerWrapper {
// return a handler wrapper
return func(h server.HandlerFunc) server.HandlerFunc {
// return a function that returns a function
return func(ctx context.Context, req server.Request, rsp interface{}) error {
// execute the handler
err := h(ctx, req, rsp)
// record the stats
stats.Record(err)
// return the error
return err
}
}
}