meter: improve meter usage across micro framework #409
@@ -3,6 +3,7 @@ package sql
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -11,31 +12,84 @@ type Statser interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
|
func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) {
|
||||||
|
if db == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
options := NewOptions(opts...)
|
options := NewOptions(opts...)
|
||||||
|
|
||||||
go func() {
|
var (
|
||||||
ticker := time.NewTicker(options.MeterStatsInterval)
|
statsMu sync.Mutex
|
||||||
defer ticker.Stop()
|
lastUpdated time.Time
|
||||||
|
maxOpenConnections, openConnections, inUse, idle, waitCount float64
|
||||||
|
maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed float64
|
||||||
|
waitDuration float64
|
||||||
|
)
|
||||||
|
|
||||||
for {
|
updateFn := func() {
|
||||||
select {
|
statsMu.Lock()
|
||||||
case <-ctx.Done():
|
defer statsMu.Unlock()
|
||||||
return
|
|
||||||
case <-ticker.C:
|
if time.Since(lastUpdated) < options.MeterStatsInterval {
|
||||||
if db == nil {
|
return
|
||||||
return
|
|
||||||
}
|
|
||||||
stats := db.Stats()
|
|
||||||
options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections))
|
|
||||||
options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections))
|
|
||||||
options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse))
|
|
||||||
options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle))
|
|
||||||
options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount))
|
|
||||||
options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds())
|
|
||||||
options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed))
|
|
||||||
options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed))
|
|
||||||
options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
stats := db.Stats()
|
||||||
|
maxOpenConnections = float64(stats.MaxOpenConnections)
|
||||||
|
openConnections = float64(stats.OpenConnections)
|
||||||
|
inUse = float64(stats.InUse)
|
||||||
|
idle = float64(stats.Idle)
|
||||||
|
waitCount = float64(stats.WaitCount)
|
||||||
|
maxIdleClosed = float64(stats.MaxIdleClosed)
|
||||||
|
maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed)
|
||||||
|
maxLifetimeClosed = float64(stats.MaxLifetimeClosed)
|
||||||
|
waitDuration = float64(stats.WaitDuration.Seconds())
|
||||||
|
|
||||||
|
lastUpdated = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
options.Meter.Gauge(MaxOpenConnections, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return maxOpenConnections
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(OpenConnections, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return openConnections
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(InuseConnections, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return inUse
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(IdleConnections, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return idle
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(WaitConnections, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return waitCount
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(BlockedSeconds, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return waitDuration
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(MaxIdleClosed, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return maxIdleClosed
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(MaxIdletimeClosed, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return maxIdleTimeClosed
|
||||||
|
})
|
||||||
|
|
||||||
|
options.Meter.Gauge(MaxLifetimeClosed, func() float64 {
|
||||||
|
updateFn()
|
||||||
|
return maxLifetimeClosed
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,6 +59,8 @@ type Meter interface {
|
|||||||
Options() Options
|
Options() Options
|
||||||
// String return meter type
|
// String return meter type
|
||||||
String() string
|
String() string
|
||||||
|
// Unregister metric name and drop all data
|
||||||
|
Unregister(name string, labels ...string) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Counter is a counter
|
// Counter is a counter
|
||||||
|
|||||||
@@ -28,6 +28,10 @@ func (r *noopMeter) Name() string {
|
|||||||
return r.opts.Name
|
return r.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *noopMeter) Unregister(name string, labels ...string) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Init initialize options
|
// Init initialize options
|
||||||
func (r *noopMeter) Init(opts ...Option) error {
|
func (r *noopMeter) Init(opts ...Option) error {
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
|
|||||||
@@ -6,18 +6,18 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v4/meter"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v4/semconv"
|
"go.unistack.org/micro/v4/semconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
func unregisterMetrics(size int) {
|
||||||
pools = make([]Statser, 0)
|
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size))
|
||||||
poolsMu sync.Mutex
|
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size))
|
||||||
)
|
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size))
|
||||||
|
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
|
||||||
|
}
|
||||||
|
|
||||||
// Stats struct
|
|
||||||
type Stats struct {
|
type Stats struct {
|
||||||
Get uint64
|
Get uint64
|
||||||
Put uint64
|
Put uint64
|
||||||
@@ -25,41 +25,13 @@ type Stats struct {
|
|||||||
Ret uint64
|
Ret uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Statser provides buffer pool stats
|
|
||||||
type Statser interface {
|
|
||||||
Stats() Stats
|
|
||||||
Cap() int
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
go newStatsMeter()
|
|
||||||
}
|
|
||||||
|
|
||||||
func newStatsMeter() {
|
|
||||||
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for range ticker.C {
|
|
||||||
poolsMu.Lock()
|
|
||||||
for _, st := range pools {
|
|
||||||
stats := st.Stats()
|
|
||||||
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
|
|
||||||
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
|
|
||||||
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
|
|
||||||
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
|
|
||||||
}
|
|
||||||
poolsMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
_ Statser = (*BytePool)(nil)
|
|
||||||
_ Statser = (*BytesPool)(nil)
|
|
||||||
_ Statser = (*StringsPool)(nil)
|
|
||||||
)
|
|
||||||
|
|
||||||
type Pool[T any] struct {
|
type Pool[T any] struct {
|
||||||
p *sync.Pool
|
p *sync.Pool
|
||||||
|
get *atomic.Uint64
|
||||||
|
put *atomic.Uint64
|
||||||
|
mis *atomic.Uint64
|
||||||
|
ret *atomic.Uint64
|
||||||
|
c int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p Pool[T]) Put(t T) {
|
func (p Pool[T]) Put(t T) {
|
||||||
@@ -70,37 +42,82 @@ func (p Pool[T]) Get() T {
|
|||||||
return p.p.Get().(T)
|
return p.p.Get().(T)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPool[T any](fn func() T) Pool[T] {
|
func NewPool[T any](fn func() T, size int) Pool[T] {
|
||||||
return Pool[T]{
|
p := Pool[T]{
|
||||||
p: &sync.Pool{
|
c: size,
|
||||||
New: func() interface{} {
|
get: &atomic.Uint64{},
|
||||||
return fn()
|
put: &atomic.Uint64{},
|
||||||
},
|
mis: &atomic.Uint64{},
|
||||||
|
ret: &atomic.Uint64{},
|
||||||
|
}
|
||||||
|
|
||||||
|
p.p = &sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
p.mis.Add(1)
|
||||||
|
return fn()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
|
||||||
|
return float64(p.get.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
|
||||||
|
return float64(p.put.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
|
||||||
|
return float64(p.mis.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
|
||||||
|
return float64(p.ret.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
type BytePool struct {
|
type BytePool struct {
|
||||||
p *sync.Pool
|
p *sync.Pool
|
||||||
get uint64
|
get *atomic.Uint64
|
||||||
put uint64
|
put *atomic.Uint64
|
||||||
mis uint64
|
mis *atomic.Uint64
|
||||||
ret uint64
|
ret *atomic.Uint64
|
||||||
c int
|
c int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBytePool(size int) *BytePool {
|
func NewBytePool(size int) *BytePool {
|
||||||
p := &BytePool{c: size}
|
p := &BytePool{
|
||||||
|
c: size,
|
||||||
|
get: &atomic.Uint64{},
|
||||||
|
put: &atomic.Uint64{},
|
||||||
|
mis: &atomic.Uint64{},
|
||||||
|
ret: &atomic.Uint64{},
|
||||||
|
}
|
||||||
p.p = &sync.Pool{
|
p.p = &sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
atomic.AddUint64(&p.mis, 1)
|
p.mis.Add(1)
|
||||||
b := make([]byte, 0, size)
|
b := make([]byte, 0, size)
|
||||||
return &b
|
return &b
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
poolsMu.Lock()
|
|
||||||
pools = append(pools, p)
|
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
|
||||||
poolsMu.Unlock()
|
return float64(p.get.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
|
||||||
|
return float64(p.put.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
|
||||||
|
return float64(p.mis.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
|
||||||
|
return float64(p.ret.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,49 +127,73 @@ func (p *BytePool) Cap() int {
|
|||||||
|
|
||||||
func (p *BytePool) Stats() Stats {
|
func (p *BytePool) Stats() Stats {
|
||||||
return Stats{
|
return Stats{
|
||||||
Put: atomic.LoadUint64(&p.put),
|
Put: p.put.Load(),
|
||||||
Get: atomic.LoadUint64(&p.get),
|
Get: p.get.Load(),
|
||||||
Mis: atomic.LoadUint64(&p.mis),
|
Mis: p.mis.Load(),
|
||||||
Ret: atomic.LoadUint64(&p.ret),
|
Ret: p.ret.Load(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *BytePool) Get() *[]byte {
|
func (p *BytePool) Get() *[]byte {
|
||||||
atomic.AddUint64(&p.get, 1)
|
p.get.Add(1)
|
||||||
return p.p.Get().(*[]byte)
|
return p.p.Get().(*[]byte)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *BytePool) Put(b *[]byte) {
|
func (p *BytePool) Put(b *[]byte) {
|
||||||
atomic.AddUint64(&p.put, 1)
|
p.put.Add(1)
|
||||||
if cap(*b) > p.c {
|
if cap(*b) > p.c {
|
||||||
atomic.AddUint64(&p.ret, 1)
|
p.ret.Add(1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
*b = (*b)[:0]
|
*b = (*b)[:0]
|
||||||
p.p.Put(b)
|
p.p.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *BytePool) Close() {
|
||||||
|
unregisterMetrics(p.c)
|
||||||
|
}
|
||||||
|
|
||||||
type BytesPool struct {
|
type BytesPool struct {
|
||||||
p *sync.Pool
|
p *sync.Pool
|
||||||
get uint64
|
get *atomic.Uint64
|
||||||
put uint64
|
put *atomic.Uint64
|
||||||
mis uint64
|
mis *atomic.Uint64
|
||||||
ret uint64
|
ret *atomic.Uint64
|
||||||
c int
|
c int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBytesPool(size int) *BytesPool {
|
func NewBytesPool(size int) *BytesPool {
|
||||||
p := &BytesPool{c: size}
|
p := &BytesPool{
|
||||||
|
c: size,
|
||||||
|
get: &atomic.Uint64{},
|
||||||
|
put: &atomic.Uint64{},
|
||||||
|
mis: &atomic.Uint64{},
|
||||||
|
ret: &atomic.Uint64{},
|
||||||
|
}
|
||||||
p.p = &sync.Pool{
|
p.p = &sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
atomic.AddUint64(&p.mis, 1)
|
p.mis.Add(1)
|
||||||
b := bytes.NewBuffer(make([]byte, 0, size))
|
b := bytes.NewBuffer(make([]byte, 0, size))
|
||||||
return b
|
return b
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
poolsMu.Lock()
|
|
||||||
pools = append(pools, p)
|
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
|
||||||
poolsMu.Unlock()
|
return float64(p.get.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
|
||||||
|
return float64(p.put.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
|
||||||
|
return float64(p.mis.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
|
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
|
||||||
|
return float64(p.ret.Load())
|
||||||
|
}, "capacity", strconv.Itoa(p.c))
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -162,10 +203,10 @@ func (p *BytesPool) Cap() int {
|
|||||||
|
|
||||||
func (p *BytesPool) Stats() Stats {
|
func (p *BytesPool) Stats() Stats {
|
||||||
return Stats{
|
return Stats{
|
||||||
Put: atomic.LoadUint64(&p.put),
|
Put: p.put.Load(),
|
||||||
Get: atomic.LoadUint64(&p.get),
|
Get: p.get.Load(),
|
||||||
Mis: atomic.LoadUint64(&p.mis),
|
Mis: p.mis.Load(),
|
||||||
Ret: atomic.LoadUint64(&p.ret),
|
Ret: p.ret.Load(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -174,34 +215,43 @@ func (p *BytesPool) Get() *bytes.Buffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *BytesPool) Put(b *bytes.Buffer) {
|
func (p *BytesPool) Put(b *bytes.Buffer) {
|
||||||
|
p.put.Add(1)
|
||||||
if (*b).Cap() > p.c {
|
if (*b).Cap() > p.c {
|
||||||
atomic.AddUint64(&p.ret, 1)
|
p.ret.Add(1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b.Reset()
|
b.Reset()
|
||||||
p.p.Put(b)
|
p.p.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *BytesPool) Close() {
|
||||||
|
unregisterMetrics(p.c)
|
||||||
|
}
|
||||||
|
|
||||||
type StringsPool struct {
|
type StringsPool struct {
|
||||||
p *sync.Pool
|
p *sync.Pool
|
||||||
get uint64
|
get *atomic.Uint64
|
||||||
put uint64
|
put *atomic.Uint64
|
||||||
mis uint64
|
mis *atomic.Uint64
|
||||||
ret uint64
|
ret *atomic.Uint64
|
||||||
c int
|
c int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStringsPool(size int) *StringsPool {
|
func NewStringsPool(size int) *StringsPool {
|
||||||
p := &StringsPool{c: size}
|
p := &StringsPool{
|
||||||
|
c: size,
|
||||||
|
get: &atomic.Uint64{},
|
||||||
|
put: &atomic.Uint64{},
|
||||||
|
mis: &atomic.Uint64{},
|
||||||
|
ret: &atomic.Uint64{},
|
||||||
|
}
|
||||||
p.p = &sync.Pool{
|
p.p = &sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
atomic.AddUint64(&p.mis, 1)
|
p.mis.Add(1)
|
||||||
return &strings.Builder{}
|
return &strings.Builder{}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
poolsMu.Lock()
|
|
||||||
pools = append(pools, p)
|
|
||||||
poolsMu.Unlock()
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -211,24 +261,28 @@ func (p *StringsPool) Cap() int {
|
|||||||
|
|
||||||
func (p *StringsPool) Stats() Stats {
|
func (p *StringsPool) Stats() Stats {
|
||||||
return Stats{
|
return Stats{
|
||||||
Put: atomic.LoadUint64(&p.put),
|
Put: p.put.Load(),
|
||||||
Get: atomic.LoadUint64(&p.get),
|
Get: p.get.Load(),
|
||||||
Mis: atomic.LoadUint64(&p.mis),
|
Mis: p.mis.Load(),
|
||||||
Ret: atomic.LoadUint64(&p.ret),
|
Ret: p.ret.Load(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StringsPool) Get() *strings.Builder {
|
func (p *StringsPool) Get() *strings.Builder {
|
||||||
atomic.AddUint64(&p.get, 1)
|
p.get.Add(1)
|
||||||
return p.p.Get().(*strings.Builder)
|
return p.p.Get().(*strings.Builder)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *StringsPool) Put(b *strings.Builder) {
|
func (p *StringsPool) Put(b *strings.Builder) {
|
||||||
atomic.AddUint64(&p.put, 1)
|
p.put.Add(1)
|
||||||
if b.Cap() > p.c {
|
if b.Cap() > p.c {
|
||||||
atomic.AddUint64(&p.ret, 1)
|
p.ret.Add(1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b.Reset()
|
b.Reset()
|
||||||
p.p.Put(b)
|
p.p.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *StringsPool) Close() {
|
||||||
|
unregisterMetrics(p.c)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user