From 82d269cfb451e5394faf8a733986a44b02a6e9b5 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 29 Sep 2024 22:58:53 +0300 Subject: [PATCH] xpool: add metrics Signed-off-by: Vasiliy Tolstov --- meter/meter.go | 2 + util/xpool/pool.go | 217 ++++++++++++++++++++++++++++++++++++++-- util/xpool/pool_test.go | 24 ++++- 3 files changed, 232 insertions(+), 11 deletions(-) diff --git a/meter/meter.go b/meter/meter.go index e5d994fe..42b20473 100644 --- a/meter/meter.go +++ b/meter/meter.go @@ -16,6 +16,8 @@ var ( DefaultAddress = ":9090" // DefaultPath the meter endpoint where the Meter data will be made available DefaultPath = "/metrics" + // DefaultMeterStatsInterval specifies interval for meter updating + DefaultMeterStatsInterval = 5 * time.Second // DefaultSummaryQuantiles is the default spread of stats for summary DefaultSummaryQuantiles = []float64{0.5, 0.9, 0.97, 0.99, 1} // DefaultSummaryWindow is the default window for summary diff --git a/util/xpool/pool.go b/util/xpool/pool.go index fa27df3a..b80cd482 100644 --- a/util/xpool/pool.go +++ b/util/xpool/pool.go @@ -2,13 +2,77 @@ package pool import ( "bytes" + "strconv" + "strings" "sync" + "sync/atomic" + "time" + + "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/semconv" +) + +var ( + pools = make([]Statser, 0) + poolsMu sync.Mutex +) + +// Stats struct +type Stats struct { + Get uint64 + Put uint64 + Mis uint64 + Ret uint64 +} + +// Statser provides buffer pool stats +type Statser interface { + Stats() Stats + Cap() int +} + +func init() { + go newStatsMeter() +} + +func newStatsMeter() { + ticker := time.NewTicker(meter.DefaultMeterStatsInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + poolsMu.Lock() + for _, st := range pools { + stats := st.Stats() + meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get) + meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put) + meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis) + meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret) + } + poolsMu.Unlock() + } + } +} + +var ( + _ Statser = (*BytePool)(nil) + _ Statser = (*BytesPool)(nil) + _ Statser = (*StringsPool)(nil) ) type Pool[T any] struct { p *sync.Pool } +func (p Pool[T]) Put(t T) { + p.p.Put(t) +} + +func (p Pool[T]) Get() T { + return p.p.Get().(T) +} + func NewPool[T any](fn func() T) Pool[T] { return Pool[T]{ p: &sync.Pool{ @@ -19,18 +83,155 @@ func NewPool[T any](fn func() T) Pool[T] { } } -func (p Pool[T]) Get() T { - return p.p.Get().(T) +type BytePool struct { + p *sync.Pool + get uint64 + put uint64 + mis uint64 + ret uint64 + c int } -func (p Pool[T]) Put(t T) { - p.p.Put(t) +func NewBytePool(size int) *BytePool { + p := &BytePool{c: size} + p.p = &sync.Pool{ + New: func() interface{} { + atomic.AddUint64(&p.mis, 1) + b := make([]byte, 0, size) + return &b + }, + } + poolsMu.Lock() + pools = append(pools, p) + poolsMu.Unlock() + return p } -func NewBytePool(size int) Pool[[]byte] { - return NewPool(func() []byte { return make([]byte, size) }) +func (p *BytePool) Cap() int { + return p.c } -func NewBytesPool() Pool[*bytes.Buffer] { - return NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) }) +func (p *BytePool) Stats() Stats { + return Stats{ + Put: atomic.LoadUint64(&p.put), + Get: atomic.LoadUint64(&p.get), + Mis: atomic.LoadUint64(&p.mis), + Ret: atomic.LoadUint64(&p.ret), + } +} + +func (p *BytePool) Get() *[]byte { + atomic.AddUint64(&p.get, 1) + return p.p.Get().(*[]byte) +} + +func (p *BytePool) Put(b *[]byte) { + atomic.AddUint64(&p.put, 1) + if cap(*b) > p.c { + atomic.AddUint64(&p.ret, 1) + return + } + *b = (*b)[:0] + p.p.Put(b) +} + +type BytesPool struct { + p *sync.Pool + get uint64 + put uint64 + mis uint64 + ret uint64 + c int +} + +func NewBytesPool(size int) *BytesPool { + p := &BytesPool{c: size} + p.p = &sync.Pool{ + New: func() interface{} { + atomic.AddUint64(&p.mis, 1) + b := bytes.NewBuffer(make([]byte, 0, size)) + return b + }, + } + poolsMu.Lock() + pools = append(pools, p) + poolsMu.Unlock() + return p +} + +func (p *BytesPool) Cap() int { + return p.c +} + +func (p *BytesPool) Stats() Stats { + return Stats{ + Put: atomic.LoadUint64(&p.put), + Get: atomic.LoadUint64(&p.get), + Mis: atomic.LoadUint64(&p.mis), + Ret: atomic.LoadUint64(&p.ret), + } +} + +func (p *BytesPool) Get() *bytes.Buffer { + return p.p.Get().(*bytes.Buffer) +} + +func (p *BytesPool) Put(b *bytes.Buffer) { + if (*b).Cap() > p.c { + atomic.AddUint64(&p.ret, 1) + return + } + b.Reset() + p.p.Put(b) +} + +type StringsPool struct { + p *sync.Pool + get uint64 + put uint64 + mis uint64 + ret uint64 + c int +} + +func NewStringsPool(size int) *StringsPool { + p := &StringsPool{c: size} + p.p = &sync.Pool{ + New: func() interface{} { + atomic.AddUint64(&p.mis, 1) + return &strings.Builder{} + }, + } + poolsMu.Lock() + pools = append(pools, p) + poolsMu.Unlock() + return p +} + +func (p *StringsPool) Cap() int { + return p.c +} + +func (p *StringsPool) Stats() Stats { + return Stats{ + Put: atomic.LoadUint64(&p.put), + Get: atomic.LoadUint64(&p.get), + Mis: atomic.LoadUint64(&p.mis), + Ret: atomic.LoadUint64(&p.ret), + } +} + +func (p *StringsPool) Get() *strings.Builder { + atomic.AddUint64(&p.get, 1) + return p.p.Get().(*strings.Builder) +} + +func (p *StringsPool) Put(b *strings.Builder) { + atomic.AddUint64(&p.put, 1) + if b.Cap() > p.c { + atomic.AddUint64(&p.ret, 1) + return + } + b.Reset() + p.p.Put(b) } diff --git a/util/xpool/pool_test.go b/util/xpool/pool_test.go index 8e7a9b81..710a0024 100644 --- a/util/xpool/pool_test.go +++ b/util/xpool/pool_test.go @@ -2,12 +2,30 @@ package pool import ( "bytes" - "strings" "testing" ) +func TestByte(t *testing.T) { + p := NewBytePool(1024) + b := p.Get() + copy(*b, []byte(`test`)) + if bytes.Equal(*b, []byte("test")) { + t.Fatal("pool not works") + } + p.Put(b) + b = p.Get() + for i := 0; i < 1500; i++ { + *b = append(*b, []byte(`test`)...) + } + p.Put(b) + st := p.Stats() + if st.Get != 2 && st.Put != 2 && st.Mis != 1 && st.Ret != 1 { + t.Fatalf("pool stats error %#+v", st) + } +} + func TestBytes(t *testing.T) { - p := NewPool(func() *bytes.Buffer { return bytes.NewBuffer(nil) }) + p := NewBytesPool(1024) b := p.Get() b.Write([]byte(`test`)) if b.String() != "test" { @@ -17,7 +35,7 @@ func TestBytes(t *testing.T) { } func TestStrings(t *testing.T) { - p := NewPool(func() *strings.Builder { return &strings.Builder{} }) + p := NewStringsPool(20) b := p.Get() b.Write([]byte(`test`)) if b.String() != "test" {