From b0cbddcfddffc9fdb2cfb0ed9d44284ceca9ee49 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 7 Oct 2025 23:54:20 +0300 Subject: [PATCH] meter: improve meter usage across micro framework (#409) Reviewed-on: https://git.unistack.org/unistack-org/micro/pulls/409 Co-authored-by: Vasiliy Tolstov Co-committed-by: Vasiliy Tolstov --- hooks/sql/stats.go | 100 ++++++++++++++----- meter/meter.go | 2 + meter/noop.go | 4 + util/xpool/pool.go | 238 +++++++++++++++++++++++++++------------------ 4 files changed, 229 insertions(+), 115 deletions(-) diff --git a/hooks/sql/stats.go b/hooks/sql/stats.go index b0f086f6..4d36ed44 100644 --- a/hooks/sql/stats.go +++ b/hooks/sql/stats.go @@ -3,6 +3,7 @@ package sql import ( "context" "database/sql" + "sync" "time" ) @@ -11,31 +12,84 @@ type Statser interface { } func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { + if db == nil { + return + } + options := NewOptions(opts...) - go func() { - ticker := time.NewTicker(options.MeterStatsInterval) - defer ticker.Stop() + var ( + statsMu sync.Mutex + lastUpdated time.Time + maxOpenConnections, openConnections, inUse, idle, waitCount float64 + maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64 + waitDuration float64 + ) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if db == nil { - return - } - stats := db.Stats() - options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections)) - options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections)) - options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse)) - options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle)) - options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount)) - options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds()) - options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed)) - options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed)) - options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed)) - } + updateFn := func() { + statsMu.Lock() + defer statsMu.Unlock() + + if time.Since(lastUpdated) < options.MeterStatsInterval { + return } - }() + + stats := db.Stats() + maxOpenConnections = float64(stats.MaxOpenConnections) + openConnections = float64(stats.OpenConnections) + inUse = float64(stats.InUse) + idle = float64(stats.Idle) + waitCount = float64(stats.WaitCount) + maxIdleClosed = float64(stats.MaxIdleClosed) + maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed) + maxLifetimeClosed = float64(stats.MaxLifetimeClosed) + waitDuration = float64(stats.WaitDuration.Seconds()) + + lastUpdated = time.Now() + } + + options.Meter.Gauge(MaxOpenConnections, func() float64 { + updateFn() + return maxOpenConnections + }) + + options.Meter.Gauge(OpenConnections, func() float64 { + updateFn() + return openConnections + }) + + options.Meter.Gauge(InuseConnections, func() float64 { + updateFn() + return inUse + }) + + options.Meter.Gauge(IdleConnections, func() float64 { + updateFn() + return idle + }) + + options.Meter.Gauge(WaitConnections, func() float64 { + updateFn() + return waitCount + }) + + options.Meter.Gauge(BlockedSeconds, func() float64 { + updateFn() + return waitDuration + }) + + options.Meter.Gauge(MaxIdleClosed, func() float64 { + updateFn() + return maxIdleClosed + }) + + options.Meter.Gauge(MaxIdletimeClosed, func() float64 { + updateFn() + return maxIdleTimeClosed + }) + + options.Meter.Gauge(MaxLifetimeClosed, func() float64 { + updateFn() + return maxLifetimeClosed + }) } diff --git a/meter/meter.go b/meter/meter.go index 9062efae..9741b7fa 100644 --- a/meter/meter.go +++ b/meter/meter.go @@ -59,6 +59,8 @@ type Meter interface { Options() Options // String return meter type String() string + // Unregister metric name and drop all data + Unregister(name string, labels ...string) bool } // Counter is a counter diff --git a/meter/noop.go b/meter/noop.go index a65c558b..f6d2ea0d 100644 --- a/meter/noop.go +++ b/meter/noop.go @@ -28,6 +28,10 @@ func (r *noopMeter) Name() string { return r.opts.Name } +func (r *noopMeter) Unregister(name string, labels ...string) bool { + return true +} + // Init initialize options func (r *noopMeter) Init(opts ...Option) error { for _, o := range opts { diff --git a/util/xpool/pool.go b/util/xpool/pool.go index d6181ca4..4312b12b 100644 --- a/util/xpool/pool.go +++ b/util/xpool/pool.go @@ -6,18 +6,18 @@ import ( "strings" "sync" "sync/atomic" - "time" "go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/semconv" ) -var ( - pools = make([]Statser, 0) - poolsMu sync.Mutex -) +func unregisterMetrics(size int) { + meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) + meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) + meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) + meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size)) +} -// Stats struct type Stats struct { Get uint64 Put uint64 @@ -25,41 +25,13 @@ type Stats struct { Ret uint64 } -// Statser provides buffer pool stats -type Statser interface { - Stats() Stats - Cap() int -} - -func init() { - go newStatsMeter() -} - -func newStatsMeter() { - ticker := time.NewTicker(meter.DefaultMeterStatsInterval) - defer ticker.Stop() - - for range ticker.C { - poolsMu.Lock() - for _, st := range pools { - stats := st.Stats() - meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get) - meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put) - meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis) - meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret) - } - poolsMu.Unlock() - } -} - -var ( - _ Statser = (*BytePool)(nil) - _ Statser = (*BytesPool)(nil) - _ Statser = (*StringsPool)(nil) -) - type Pool[T any] struct { - p *sync.Pool + p *sync.Pool + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 + c int } func (p Pool[T]) Put(t T) { @@ -70,37 +42,82 @@ func (p Pool[T]) Get() T { return p.p.Get().(T) } -func NewPool[T any](fn func() T) Pool[T] { - return Pool[T]{ - p: &sync.Pool{ - New: func() interface{} { - return fn() - }, +func NewPool[T any](fn func() T, size int) Pool[T] { + p := Pool[T]{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } + + p.p = &sync.Pool{ + New: func() interface{} { + p.mis.Add(1) + return fn() }, } + + meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { + return float64(p.get.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { + return float64(p.put.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { + return float64(p.mis.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { + return float64(p.ret.Load()) + }, "capacity", strconv.Itoa(p.c)) + + return p } type BytePool struct { p *sync.Pool - get uint64 - put uint64 - mis uint64 - ret uint64 + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 c int } func NewBytePool(size int) *BytePool { - p := &BytePool{c: size} + p := &BytePool{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } p.p = &sync.Pool{ New: func() interface{} { - atomic.AddUint64(&p.mis, 1) + p.mis.Add(1) b := make([]byte, 0, size) return &b }, } - poolsMu.Lock() - pools = append(pools, p) - poolsMu.Unlock() + + meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { + return float64(p.get.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { + return float64(p.put.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { + return float64(p.mis.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { + return float64(p.ret.Load()) + }, "capacity", strconv.Itoa(p.c)) + return p } @@ -110,49 +127,73 @@ func (p *BytePool) Cap() int { func (p *BytePool) Stats() Stats { return Stats{ - Put: atomic.LoadUint64(&p.put), - Get: atomic.LoadUint64(&p.get), - Mis: atomic.LoadUint64(&p.mis), - Ret: atomic.LoadUint64(&p.ret), + Put: p.put.Load(), + Get: p.get.Load(), + Mis: p.mis.Load(), + Ret: p.ret.Load(), } } func (p *BytePool) Get() *[]byte { - atomic.AddUint64(&p.get, 1) + p.get.Add(1) return p.p.Get().(*[]byte) } func (p *BytePool) Put(b *[]byte) { - atomic.AddUint64(&p.put, 1) + p.put.Add(1) if cap(*b) > p.c { - atomic.AddUint64(&p.ret, 1) + p.ret.Add(1) return } *b = (*b)[:0] p.p.Put(b) } +func (p *BytePool) Close() { + unregisterMetrics(p.c) +} + type BytesPool struct { p *sync.Pool - get uint64 - put uint64 - mis uint64 - ret uint64 + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 c int } func NewBytesPool(size int) *BytesPool { - p := &BytesPool{c: size} + p := &BytesPool{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } p.p = &sync.Pool{ New: func() interface{} { - atomic.AddUint64(&p.mis, 1) + p.mis.Add(1) b := bytes.NewBuffer(make([]byte, 0, size)) return b }, } - poolsMu.Lock() - pools = append(pools, p) - poolsMu.Unlock() + + meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { + return float64(p.get.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { + return float64(p.put.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { + return float64(p.mis.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { + return float64(p.ret.Load()) + }, "capacity", strconv.Itoa(p.c)) + return p } @@ -162,10 +203,10 @@ func (p *BytesPool) Cap() int { func (p *BytesPool) Stats() Stats { return Stats{ - Put: atomic.LoadUint64(&p.put), - Get: atomic.LoadUint64(&p.get), - Mis: atomic.LoadUint64(&p.mis), - Ret: atomic.LoadUint64(&p.ret), + Put: p.put.Load(), + Get: p.get.Load(), + Mis: p.mis.Load(), + Ret: p.ret.Load(), } } @@ -174,34 +215,43 @@ func (p *BytesPool) Get() *bytes.Buffer { } func (p *BytesPool) Put(b *bytes.Buffer) { + p.put.Add(1) if (*b).Cap() > p.c { - atomic.AddUint64(&p.ret, 1) + p.ret.Add(1) return } b.Reset() p.p.Put(b) } +func (p *BytesPool) Close() { + unregisterMetrics(p.c) +} + type StringsPool struct { p *sync.Pool - get uint64 - put uint64 - mis uint64 - ret uint64 + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 c int } func NewStringsPool(size int) *StringsPool { - p := &StringsPool{c: size} + p := &StringsPool{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } p.p = &sync.Pool{ New: func() interface{} { - atomic.AddUint64(&p.mis, 1) + p.mis.Add(1) return &strings.Builder{} }, } - poolsMu.Lock() - pools = append(pools, p) - poolsMu.Unlock() + return p } @@ -211,24 +261,28 @@ func (p *StringsPool) Cap() int { func (p *StringsPool) Stats() Stats { return Stats{ - Put: atomic.LoadUint64(&p.put), - Get: atomic.LoadUint64(&p.get), - Mis: atomic.LoadUint64(&p.mis), - Ret: atomic.LoadUint64(&p.ret), + Put: p.put.Load(), + Get: p.get.Load(), + Mis: p.mis.Load(), + Ret: p.ret.Load(), } } func (p *StringsPool) Get() *strings.Builder { - atomic.AddUint64(&p.get, 1) + p.get.Add(1) return p.p.Get().(*strings.Builder) } func (p *StringsPool) Put(b *strings.Builder) { - atomic.AddUint64(&p.put, 1) + p.put.Add(1) if b.Cap() > p.c { - atomic.AddUint64(&p.ret, 1) + p.ret.Add(1) return } b.Reset() p.p.Put(b) } + +func (p *StringsPool) Close() { + unregisterMetrics(p.c) +}