Compare commits

..

2 Commits

Author SHA1 Message Date
3d4198ac42 remove goroutine for db
Some checks failed
coverage / build (pull_request) Failing after 2m5s
lint / lint (pull_request) Successful in 4m47s
test / test (pull_request) Successful in 4m5s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-10-07 00:07:25 +03:00
5dfcc76ce8 meter: add Unregister metric
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-09-22 09:52:17 +03:00

View File

@@ -6,18 +6,18 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/semconv"
) )
func unregisterMetrics(size int) { var (
meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) pools = make([]Statser, 0)
meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) poolsMu sync.Mutex
meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) )
meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size))
}
// Stats struct
type Stats struct { type Stats struct {
Get uint64 Get uint64
Put uint64 Put uint64
@@ -25,13 +25,41 @@ type Stats struct {
Ret uint64 Ret uint64
} }
// Statser provides buffer pool stats
type Statser interface {
Stats() Stats
Cap() int
}
func init() {
go newStatsMeter()
}
func newStatsMeter() {
ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
defer ticker.Stop()
for range ticker.C {
poolsMu.Lock()
for _, st := range pools {
stats := st.Stats()
meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get)
meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put)
meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis)
meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret)
}
poolsMu.Unlock()
}
}
var (
_ Statser = (*BytePool)(nil)
_ Statser = (*BytesPool)(nil)
_ Statser = (*StringsPool)(nil)
)
type Pool[T any] struct { type Pool[T any] struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64
put *atomic.Uint64
mis *atomic.Uint64
ret *atomic.Uint64
c int
} }
func (p Pool[T]) Put(t T) { func (p Pool[T]) Put(t T) {
@@ -42,82 +70,37 @@ func (p Pool[T]) Get() T {
return p.p.Get().(T) return p.p.Get().(T)
} }
func NewPool[T any](fn func() T, size int) Pool[T] { func NewPool[T any](fn func() T) Pool[T] {
p := Pool[T]{ return Pool[T]{
c: size, p: &sync.Pool{
get: &atomic.Uint64{}, New: func() interface{} {
put: &atomic.Uint64{}, return fn()
mis: &atomic.Uint64{}, },
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{
New: func() interface{} {
p.mis.Add(1)
return fn()
}, },
} }
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 {
return float64(p.get.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p
} }
type BytePool struct { type BytePool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytePool(size int) *BytePool { func NewBytePool(size int) *BytePool {
p := &BytePool{ p := &BytePool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := make([]byte, 0, size) b := make([]byte, 0, size)
return &b return &b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -127,73 +110,49 @@ func (p *BytePool) Cap() int {
func (p *BytePool) Stats() Stats { func (p *BytePool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *BytePool) Get() *[]byte { func (p *BytePool) Get() *[]byte {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*[]byte) return p.p.Get().(*[]byte)
} }
func (p *BytePool) Put(b *[]byte) { func (p *BytePool) Put(b *[]byte) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if cap(*b) > p.c { if cap(*b) > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
*b = (*b)[:0] *b = (*b)[:0]
p.p.Put(b) p.p.Put(b)
} }
func (p *BytePool) Close() {
unregisterMetrics(p.c)
}
type BytesPool struct { type BytesPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewBytesPool(size int) *BytesPool { func NewBytesPool(size int) *BytesPool {
p := &BytesPool{ p := &BytesPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
b := bytes.NewBuffer(make([]byte, 0, size)) b := bytes.NewBuffer(make([]byte, 0, size))
return b return b
}, },
} }
poolsMu.Lock()
meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { pools = append(pools, p)
return float64(p.get.Load()) poolsMu.Unlock()
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 {
return float64(p.put.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 {
return float64(p.mis.Load())
}, "capacity", strconv.Itoa(p.c))
meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 {
return float64(p.ret.Load())
}, "capacity", strconv.Itoa(p.c))
return p return p
} }
@@ -203,10 +162,10 @@ func (p *BytesPool) Cap() int {
func (p *BytesPool) Stats() Stats { func (p *BytesPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
@@ -215,43 +174,34 @@ func (p *BytesPool) Get() *bytes.Buffer {
} }
func (p *BytesPool) Put(b *bytes.Buffer) { func (p *BytesPool) Put(b *bytes.Buffer) {
p.put.Add(1)
if (*b).Cap() > p.c { if (*b).Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *BytesPool) Close() {
unregisterMetrics(p.c)
}
type StringsPool struct { type StringsPool struct {
p *sync.Pool p *sync.Pool
get *atomic.Uint64 get uint64
put *atomic.Uint64 put uint64
mis *atomic.Uint64 mis uint64
ret *atomic.Uint64 ret uint64
c int c int
} }
func NewStringsPool(size int) *StringsPool { func NewStringsPool(size int) *StringsPool {
p := &StringsPool{ p := &StringsPool{c: size}
c: size,
get: &atomic.Uint64{},
put: &atomic.Uint64{},
mis: &atomic.Uint64{},
ret: &atomic.Uint64{},
}
p.p = &sync.Pool{ p.p = &sync.Pool{
New: func() interface{} { New: func() interface{} {
p.mis.Add(1) atomic.AddUint64(&p.mis, 1)
return &strings.Builder{} return &strings.Builder{}
}, },
} }
poolsMu.Lock()
pools = append(pools, p)
poolsMu.Unlock()
return p return p
} }
@@ -261,28 +211,24 @@ func (p *StringsPool) Cap() int {
func (p *StringsPool) Stats() Stats { func (p *StringsPool) Stats() Stats {
return Stats{ return Stats{
Put: p.put.Load(), Put: atomic.LoadUint64(&p.put),
Get: p.get.Load(), Get: atomic.LoadUint64(&p.get),
Mis: p.mis.Load(), Mis: atomic.LoadUint64(&p.mis),
Ret: p.ret.Load(), Ret: atomic.LoadUint64(&p.ret),
} }
} }
func (p *StringsPool) Get() *strings.Builder { func (p *StringsPool) Get() *strings.Builder {
p.get.Add(1) atomic.AddUint64(&p.get, 1)
return p.p.Get().(*strings.Builder) return p.p.Get().(*strings.Builder)
} }
func (p *StringsPool) Put(b *strings.Builder) { func (p *StringsPool) Put(b *strings.Builder) {
p.put.Add(1) atomic.AddUint64(&p.put, 1)
if b.Cap() > p.c { if b.Cap() > p.c {
p.ret.Add(1) atomic.AddUint64(&p.ret, 1)
return return
} }
b.Reset() b.Reset()
p.p.Put(b) p.p.Put(b)
} }
func (p *StringsPool) Close() {
unregisterMetrics(p.c)
}