fixup util/xpool
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -6,18 +6,18 @@ import ( | |||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v4/meter" | 	"go.unistack.org/micro/v4/meter" | ||||||
| 	"go.unistack.org/micro/v4/semconv" | 	"go.unistack.org/micro/v4/semconv" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | func unregisterMetrics(size int) { | ||||||
| 	pools   = make([]Statser, 0) | 	meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) | ||||||
| 	poolsMu sync.Mutex | 	meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) | ||||||
| ) | 	meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) | ||||||
|  | 	meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size)) | ||||||
|  | } | ||||||
|  |  | ||||||
| // Stats struct |  | ||||||
| type Stats struct { | type Stats struct { | ||||||
| 	Get uint64 | 	Get uint64 | ||||||
| 	Put uint64 | 	Put uint64 | ||||||
| @@ -25,41 +25,13 @@ type Stats struct { | |||||||
| 	Ret uint64 | 	Ret uint64 | ||||||
| } | } | ||||||
|  |  | ||||||
| // Statser provides buffer pool stats |  | ||||||
| type Statser interface { |  | ||||||
| 	Stats() Stats |  | ||||||
| 	Cap() int |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func init() { |  | ||||||
| 	go newStatsMeter() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func newStatsMeter() { |  | ||||||
| 	ticker := time.NewTicker(meter.DefaultMeterStatsInterval) |  | ||||||
| 	defer ticker.Stop() |  | ||||||
|  |  | ||||||
| 	for range ticker.C { |  | ||||||
| 		poolsMu.Lock() |  | ||||||
| 		for _, st := range pools { |  | ||||||
| 			stats := st.Stats() |  | ||||||
| 			meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get) |  | ||||||
| 			meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put) |  | ||||||
| 			meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis) |  | ||||||
| 			meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret) |  | ||||||
| 		} |  | ||||||
| 		poolsMu.Unlock() |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| var ( |  | ||||||
| 	_ Statser = (*BytePool)(nil) |  | ||||||
| 	_ Statser = (*BytesPool)(nil) |  | ||||||
| 	_ Statser = (*StringsPool)(nil) |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| type Pool[T any] struct { | type Pool[T any] struct { | ||||||
| 	p   *sync.Pool | 	p   *sync.Pool | ||||||
|  | 	get *atomic.Uint64 | ||||||
|  | 	put *atomic.Uint64 | ||||||
|  | 	mis *atomic.Uint64 | ||||||
|  | 	ret *atomic.Uint64 | ||||||
|  | 	c   int | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p Pool[T]) Put(t T) { | func (p Pool[T]) Put(t T) { | ||||||
| @@ -70,37 +42,82 @@ func (p Pool[T]) Get() T { | |||||||
| 	return p.p.Get().(T) | 	return p.p.Get().(T) | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewPool[T any](fn func() T) Pool[T] { | func NewPool[T any](fn func() T, size int) Pool[T] { | ||||||
| 	return Pool[T]{ | 	p := Pool[T]{ | ||||||
| 		p: &sync.Pool{ | 		c:   size, | ||||||
|  | 		get: &atomic.Uint64{}, | ||||||
|  | 		put: &atomic.Uint64{}, | ||||||
|  | 		mis: &atomic.Uint64{}, | ||||||
|  | 		ret: &atomic.Uint64{}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	p.p = &sync.Pool{ | ||||||
| 		New: func() interface{} { | 		New: func() interface{} { | ||||||
|  | 			p.mis.Add(1) | ||||||
| 			return fn() | 			return fn() | ||||||
| 		}, | 		}, | ||||||
| 		}, |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { | ||||||
|  | 		return float64(p.get.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { | ||||||
|  | 		return float64(p.put.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { | ||||||
|  | 		return float64(p.mis.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { | ||||||
|  | 		return float64(p.ret.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	return p | ||||||
| } | } | ||||||
|  |  | ||||||
| type BytePool struct { | type BytePool struct { | ||||||
| 	p   *sync.Pool | 	p   *sync.Pool | ||||||
| 	get uint64 | 	get *atomic.Uint64 | ||||||
| 	put uint64 | 	put *atomic.Uint64 | ||||||
| 	mis uint64 | 	mis *atomic.Uint64 | ||||||
| 	ret uint64 | 	ret *atomic.Uint64 | ||||||
| 	c   int | 	c   int | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewBytePool(size int) *BytePool { | func NewBytePool(size int) *BytePool { | ||||||
| 	p := &BytePool{c: size} | 	p := &BytePool{ | ||||||
|  | 		c:   size, | ||||||
|  | 		get: &atomic.Uint64{}, | ||||||
|  | 		put: &atomic.Uint64{}, | ||||||
|  | 		mis: &atomic.Uint64{}, | ||||||
|  | 		ret: &atomic.Uint64{}, | ||||||
|  | 	} | ||||||
| 	p.p = &sync.Pool{ | 	p.p = &sync.Pool{ | ||||||
| 		New: func() interface{} { | 		New: func() interface{} { | ||||||
| 			atomic.AddUint64(&p.mis, 1) | 			p.mis.Add(1) | ||||||
| 			b := make([]byte, 0, size) | 			b := make([]byte, 0, size) | ||||||
| 			return &b | 			return &b | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	poolsMu.Lock() |  | ||||||
| 	pools = append(pools, p) | 	meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { | ||||||
| 	poolsMu.Unlock() | 		return float64(p.get.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { | ||||||
|  | 		return float64(p.put.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { | ||||||
|  | 		return float64(p.mis.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { | ||||||
|  | 		return float64(p.ret.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
| 	return p | 	return p | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -110,49 +127,73 @@ func (p *BytePool) Cap() int { | |||||||
|  |  | ||||||
| func (p *BytePool) Stats() Stats { | func (p *BytePool) Stats() Stats { | ||||||
| 	return Stats{ | 	return Stats{ | ||||||
| 		Put: atomic.LoadUint64(&p.put), | 		Put: p.put.Load(), | ||||||
| 		Get: atomic.LoadUint64(&p.get), | 		Get: p.get.Load(), | ||||||
| 		Mis: atomic.LoadUint64(&p.mis), | 		Mis: p.mis.Load(), | ||||||
| 		Ret: atomic.LoadUint64(&p.ret), | 		Ret: p.ret.Load(), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *BytePool) Get() *[]byte { | func (p *BytePool) Get() *[]byte { | ||||||
| 	atomic.AddUint64(&p.get, 1) | 	p.get.Add(1) | ||||||
| 	return p.p.Get().(*[]byte) | 	return p.p.Get().(*[]byte) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *BytePool) Put(b *[]byte) { | func (p *BytePool) Put(b *[]byte) { | ||||||
| 	atomic.AddUint64(&p.put, 1) | 	p.put.Add(1) | ||||||
| 	if cap(*b) > p.c { | 	if cap(*b) > p.c { | ||||||
| 		atomic.AddUint64(&p.ret, 1) | 		p.ret.Add(1) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	*b = (*b)[:0] | 	*b = (*b)[:0] | ||||||
| 	p.p.Put(b) | 	p.p.Put(b) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (p *BytePool) Close() { | ||||||
|  | 	unregisterMetrics(p.c) | ||||||
|  | } | ||||||
|  |  | ||||||
| type BytesPool struct { | type BytesPool struct { | ||||||
| 	p   *sync.Pool | 	p   *sync.Pool | ||||||
| 	get uint64 | 	get *atomic.Uint64 | ||||||
| 	put uint64 | 	put *atomic.Uint64 | ||||||
| 	mis uint64 | 	mis *atomic.Uint64 | ||||||
| 	ret uint64 | 	ret *atomic.Uint64 | ||||||
| 	c   int | 	c   int | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewBytesPool(size int) *BytesPool { | func NewBytesPool(size int) *BytesPool { | ||||||
| 	p := &BytesPool{c: size} | 	p := &BytesPool{ | ||||||
|  | 		c:   size, | ||||||
|  | 		get: &atomic.Uint64{}, | ||||||
|  | 		put: &atomic.Uint64{}, | ||||||
|  | 		mis: &atomic.Uint64{}, | ||||||
|  | 		ret: &atomic.Uint64{}, | ||||||
|  | 	} | ||||||
| 	p.p = &sync.Pool{ | 	p.p = &sync.Pool{ | ||||||
| 		New: func() interface{} { | 		New: func() interface{} { | ||||||
| 			atomic.AddUint64(&p.mis, 1) | 			p.mis.Add(1) | ||||||
| 			b := bytes.NewBuffer(make([]byte, 0, size)) | 			b := bytes.NewBuffer(make([]byte, 0, size)) | ||||||
| 			return b | 			return b | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	poolsMu.Lock() |  | ||||||
| 	pools = append(pools, p) | 	meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { | ||||||
| 	poolsMu.Unlock() | 		return float64(p.get.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { | ||||||
|  | 		return float64(p.put.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { | ||||||
|  | 		return float64(p.mis.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
|  | 	meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { | ||||||
|  | 		return float64(p.ret.Load()) | ||||||
|  | 	}, "capacity", strconv.Itoa(p.c)) | ||||||
|  |  | ||||||
| 	return p | 	return p | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -162,10 +203,10 @@ func (p *BytesPool) Cap() int { | |||||||
|  |  | ||||||
| func (p *BytesPool) Stats() Stats { | func (p *BytesPool) Stats() Stats { | ||||||
| 	return Stats{ | 	return Stats{ | ||||||
| 		Put: atomic.LoadUint64(&p.put), | 		Put: p.put.Load(), | ||||||
| 		Get: atomic.LoadUint64(&p.get), | 		Get: p.get.Load(), | ||||||
| 		Mis: atomic.LoadUint64(&p.mis), | 		Mis: p.mis.Load(), | ||||||
| 		Ret: atomic.LoadUint64(&p.ret), | 		Ret: p.ret.Load(), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -174,34 +215,43 @@ func (p *BytesPool) Get() *bytes.Buffer { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (p *BytesPool) Put(b *bytes.Buffer) { | func (p *BytesPool) Put(b *bytes.Buffer) { | ||||||
|  | 	p.put.Add(1) | ||||||
| 	if (*b).Cap() > p.c { | 	if (*b).Cap() > p.c { | ||||||
| 		atomic.AddUint64(&p.ret, 1) | 		p.ret.Add(1) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	b.Reset() | 	b.Reset() | ||||||
| 	p.p.Put(b) | 	p.p.Put(b) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (p *BytesPool) Close() { | ||||||
|  | 	unregisterMetrics(p.c) | ||||||
|  | } | ||||||
|  |  | ||||||
| type StringsPool struct { | type StringsPool struct { | ||||||
| 	p   *sync.Pool | 	p   *sync.Pool | ||||||
| 	get uint64 | 	get *atomic.Uint64 | ||||||
| 	put uint64 | 	put *atomic.Uint64 | ||||||
| 	mis uint64 | 	mis *atomic.Uint64 | ||||||
| 	ret uint64 | 	ret *atomic.Uint64 | ||||||
| 	c   int | 	c   int | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewStringsPool(size int) *StringsPool { | func NewStringsPool(size int) *StringsPool { | ||||||
| 	p := &StringsPool{c: size} | 	p := &StringsPool{ | ||||||
|  | 		c:   size, | ||||||
|  | 		get: &atomic.Uint64{}, | ||||||
|  | 		put: &atomic.Uint64{}, | ||||||
|  | 		mis: &atomic.Uint64{}, | ||||||
|  | 		ret: &atomic.Uint64{}, | ||||||
|  | 	} | ||||||
| 	p.p = &sync.Pool{ | 	p.p = &sync.Pool{ | ||||||
| 		New: func() interface{} { | 		New: func() interface{} { | ||||||
| 			atomic.AddUint64(&p.mis, 1) | 			p.mis.Add(1) | ||||||
| 			return &strings.Builder{} | 			return &strings.Builder{} | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	poolsMu.Lock() |  | ||||||
| 	pools = append(pools, p) |  | ||||||
| 	poolsMu.Unlock() |  | ||||||
| 	return p | 	return p | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -211,24 +261,28 @@ func (p *StringsPool) Cap() int { | |||||||
|  |  | ||||||
| func (p *StringsPool) Stats() Stats { | func (p *StringsPool) Stats() Stats { | ||||||
| 	return Stats{ | 	return Stats{ | ||||||
| 		Put: atomic.LoadUint64(&p.put), | 		Put: p.put.Load(), | ||||||
| 		Get: atomic.LoadUint64(&p.get), | 		Get: p.get.Load(), | ||||||
| 		Mis: atomic.LoadUint64(&p.mis), | 		Mis: p.mis.Load(), | ||||||
| 		Ret: atomic.LoadUint64(&p.ret), | 		Ret: p.ret.Load(), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *StringsPool) Get() *strings.Builder { | func (p *StringsPool) Get() *strings.Builder { | ||||||
| 	atomic.AddUint64(&p.get, 1) | 	p.get.Add(1) | ||||||
| 	return p.p.Get().(*strings.Builder) | 	return p.p.Get().(*strings.Builder) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *StringsPool) Put(b *strings.Builder) { | func (p *StringsPool) Put(b *strings.Builder) { | ||||||
| 	atomic.AddUint64(&p.put, 1) | 	p.put.Add(1) | ||||||
| 	if b.Cap() > p.c { | 	if b.Cap() > p.c { | ||||||
| 		atomic.AddUint64(&p.ret, 1) | 		p.ret.Add(1) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	b.Reset() | 	b.Reset() | ||||||
| 	p.p.Put(b) | 	p.p.Put(b) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (p *StringsPool) Close() { | ||||||
|  | 	unregisterMetrics(p.c) | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user