diff --git a/util/xpool/pool.go b/util/xpool/pool.go index d6181ca4..4312b12b 100644 --- a/util/xpool/pool.go +++ b/util/xpool/pool.go @@ -6,18 +6,18 @@ import ( "strings" "sync" "sync/atomic" - "time" "go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/semconv" ) -var ( - pools = make([]Statser, 0) - poolsMu sync.Mutex -) +func unregisterMetrics(size int) { + meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) + meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) + meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) + meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size)) +} -// Stats struct type Stats struct { Get uint64 Put uint64 @@ -25,41 +25,13 @@ type Stats struct { Ret uint64 } -// Statser provides buffer pool stats -type Statser interface { - Stats() Stats - Cap() int -} - -func init() { - go newStatsMeter() -} - -func newStatsMeter() { - ticker := time.NewTicker(meter.DefaultMeterStatsInterval) - defer ticker.Stop() - - for range ticker.C { - poolsMu.Lock() - for _, st := range pools { - stats := st.Stats() - meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get) - meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put) - meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis) - meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret) - } - poolsMu.Unlock() - } -} - -var ( - _ Statser = (*BytePool)(nil) - _ Statser = (*BytesPool)(nil) - _ Statser = (*StringsPool)(nil) -) - type Pool[T any] struct { - p *sync.Pool + p *sync.Pool + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 + c int } func (p Pool[T]) Put(t T) { @@ -70,37 +42,82 @@ func (p Pool[T]) Get() T { return p.p.Get().(T) } -func NewPool[T any](fn func() T) Pool[T] { - return Pool[T]{ - p: &sync.Pool{ - New: func() interface{} { - return fn() - }, +func NewPool[T any](fn func() T, size int) Pool[T] { + p := Pool[T]{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } + + p.p = &sync.Pool{ + New: func() interface{} { + p.mis.Add(1) + return fn() }, } + + meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { + return float64(p.get.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { + return float64(p.put.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { + return float64(p.mis.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { + return float64(p.ret.Load()) + }, "capacity", strconv.Itoa(p.c)) + + return p } type BytePool struct { p *sync.Pool - get uint64 - put uint64 - mis uint64 - ret uint64 + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 c int } func NewBytePool(size int) *BytePool { - p := &BytePool{c: size} + p := &BytePool{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } p.p = &sync.Pool{ New: func() interface{} { - atomic.AddUint64(&p.mis, 1) + p.mis.Add(1) b := make([]byte, 0, size) return &b }, } - poolsMu.Lock() - pools = append(pools, p) - poolsMu.Unlock() + + meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { + return float64(p.get.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { + return float64(p.put.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { + return float64(p.mis.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { + return float64(p.ret.Load()) + }, "capacity", strconv.Itoa(p.c)) + return p } @@ -110,49 +127,73 @@ func (p *BytePool) Cap() int { func (p *BytePool) Stats() Stats { return Stats{ - Put: atomic.LoadUint64(&p.put), - Get: atomic.LoadUint64(&p.get), - Mis: atomic.LoadUint64(&p.mis), - Ret: atomic.LoadUint64(&p.ret), + Put: p.put.Load(), + Get: p.get.Load(), + Mis: p.mis.Load(), + Ret: p.ret.Load(), } } func (p *BytePool) Get() *[]byte { - atomic.AddUint64(&p.get, 1) + p.get.Add(1) return p.p.Get().(*[]byte) } func (p *BytePool) Put(b *[]byte) { - atomic.AddUint64(&p.put, 1) + p.put.Add(1) if cap(*b) > p.c { - atomic.AddUint64(&p.ret, 1) + p.ret.Add(1) return } *b = (*b)[:0] p.p.Put(b) } +func (p *BytePool) Close() { + unregisterMetrics(p.c) +} + type BytesPool struct { p *sync.Pool - get uint64 - put uint64 - mis uint64 - ret uint64 + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 c int } func NewBytesPool(size int) *BytesPool { - p := &BytesPool{c: size} + p := &BytesPool{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } p.p = &sync.Pool{ New: func() interface{} { - atomic.AddUint64(&p.mis, 1) + p.mis.Add(1) b := bytes.NewBuffer(make([]byte, 0, size)) return b }, } - poolsMu.Lock() - pools = append(pools, p) - poolsMu.Unlock() + + meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { + return float64(p.get.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { + return float64(p.put.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { + return float64(p.mis.Load()) + }, "capacity", strconv.Itoa(p.c)) + + meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { + return float64(p.ret.Load()) + }, "capacity", strconv.Itoa(p.c)) + return p } @@ -162,10 +203,10 @@ func (p *BytesPool) Cap() int { func (p *BytesPool) Stats() Stats { return Stats{ - Put: atomic.LoadUint64(&p.put), - Get: atomic.LoadUint64(&p.get), - Mis: atomic.LoadUint64(&p.mis), - Ret: atomic.LoadUint64(&p.ret), + Put: p.put.Load(), + Get: p.get.Load(), + Mis: p.mis.Load(), + Ret: p.ret.Load(), } } @@ -174,34 +215,43 @@ func (p *BytesPool) Get() *bytes.Buffer { } func (p *BytesPool) Put(b *bytes.Buffer) { + p.put.Add(1) if (*b).Cap() > p.c { - atomic.AddUint64(&p.ret, 1) + p.ret.Add(1) return } b.Reset() p.p.Put(b) } +func (p *BytesPool) Close() { + unregisterMetrics(p.c) +} + type StringsPool struct { p *sync.Pool - get uint64 - put uint64 - mis uint64 - ret uint64 + get *atomic.Uint64 + put *atomic.Uint64 + mis *atomic.Uint64 + ret *atomic.Uint64 c int } func NewStringsPool(size int) *StringsPool { - p := &StringsPool{c: size} + p := &StringsPool{ + c: size, + get: &atomic.Uint64{}, + put: &atomic.Uint64{}, + mis: &atomic.Uint64{}, + ret: &atomic.Uint64{}, + } p.p = &sync.Pool{ New: func() interface{} { - atomic.AddUint64(&p.mis, 1) + p.mis.Add(1) return &strings.Builder{} }, } - poolsMu.Lock() - pools = append(pools, p) - poolsMu.Unlock() + return p } @@ -211,24 +261,28 @@ func (p *StringsPool) Cap() int { func (p *StringsPool) Stats() Stats { return Stats{ - Put: atomic.LoadUint64(&p.put), - Get: atomic.LoadUint64(&p.get), - Mis: atomic.LoadUint64(&p.mis), - Ret: atomic.LoadUint64(&p.ret), + Put: p.put.Load(), + Get: p.get.Load(), + Mis: p.mis.Load(), + Ret: p.ret.Load(), } } func (p *StringsPool) Get() *strings.Builder { - atomic.AddUint64(&p.get, 1) + p.get.Add(1) return p.p.Get().(*strings.Builder) } func (p *StringsPool) Put(b *strings.Builder) { - atomic.AddUint64(&p.put, 1) + p.put.Add(1) if b.Cap() > p.c { - atomic.AddUint64(&p.ret, 1) + p.ret.Add(1) return } b.Reset() p.p.Put(b) } + +func (p *StringsPool) Close() { + unregisterMetrics(p.c) +}