meter: improve meter usage across micro framework #409
| @@ -3,6 +3,7 @@ package sql | ||||
| import ( | ||||
| 	"context" | ||||
| 	"database/sql" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| @@ -11,31 +12,84 @@ type Statser interface { | ||||
| } | ||||
|  | ||||
| func NewStatsMeter(ctx context.Context, db Statser, opts ...Option) { | ||||
| 	options := NewOptions(opts...) | ||||
|  | ||||
| 	go func() { | ||||
| 		ticker := time.NewTicker(options.MeterStatsInterval) | ||||
| 		defer ticker.Stop() | ||||
|  | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-ctx.Done(): | ||||
| 				return | ||||
| 			case <-ticker.C: | ||||
| 	if db == nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	options := NewOptions(opts...) | ||||
|  | ||||
| 	var ( | ||||
| 		statsMu                                                     sync.Mutex | ||||
| 		lastUpdated                                                 time.Time | ||||
| 		maxOpenConnections, openConnections, inUse, idle, waitCount float64 | ||||
| 		maxIdleClosed, maxIdleTimeClosed, maxLifetimeClosed         float64 | ||||
| 		waitDuration                                                float64 | ||||
| 	) | ||||
|  | ||||
| 	updateFn := func() { | ||||
| 		statsMu.Lock() | ||||
| 		defer statsMu.Unlock() | ||||
|  | ||||
| 		if time.Since(lastUpdated) < options.MeterStatsInterval { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		stats := db.Stats() | ||||
| 				options.Meter.Counter(MaxOpenConnections).Set(uint64(stats.MaxOpenConnections)) | ||||
| 				options.Meter.Counter(OpenConnections).Set(uint64(stats.OpenConnections)) | ||||
| 				options.Meter.Counter(InuseConnections).Set(uint64(stats.InUse)) | ||||
| 				options.Meter.Counter(IdleConnections).Set(uint64(stats.Idle)) | ||||
| 				options.Meter.Counter(WaitConnections).Set(uint64(stats.WaitCount)) | ||||
| 				options.Meter.FloatCounter(BlockedSeconds).Set(stats.WaitDuration.Seconds()) | ||||
| 				options.Meter.Counter(MaxIdleClosed).Set(uint64(stats.MaxIdleClosed)) | ||||
| 				options.Meter.Counter(MaxIdletimeClosed).Set(uint64(stats.MaxIdleTimeClosed)) | ||||
| 				options.Meter.Counter(MaxLifetimeClosed).Set(uint64(stats.MaxLifetimeClosed)) | ||||
| 		maxOpenConnections = float64(stats.MaxOpenConnections) | ||||
| 		openConnections = float64(stats.OpenConnections) | ||||
| 		inUse = float64(stats.InUse) | ||||
| 		idle = float64(stats.Idle) | ||||
| 		waitCount = float64(stats.WaitCount) | ||||
| 		maxIdleClosed = float64(stats.MaxIdleClosed) | ||||
| 		maxIdleTimeClosed = float64(stats.MaxIdleTimeClosed) | ||||
| 		maxLifetimeClosed = float64(stats.MaxLifetimeClosed) | ||||
| 		waitDuration = float64(stats.WaitDuration.Seconds()) | ||||
|  | ||||
| 		lastUpdated = time.Now() | ||||
| 	} | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	options.Meter.Gauge(MaxOpenConnections, func() float64 { | ||||
| 		updateFn() | ||||
| 		return maxOpenConnections | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(OpenConnections, func() float64 { | ||||
| 		updateFn() | ||||
| 		return openConnections | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(InuseConnections, func() float64 { | ||||
| 		updateFn() | ||||
| 		return inUse | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(IdleConnections, func() float64 { | ||||
| 		updateFn() | ||||
| 		return idle | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(WaitConnections, func() float64 { | ||||
| 		updateFn() | ||||
| 		return waitCount | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(BlockedSeconds, func() float64 { | ||||
| 		updateFn() | ||||
| 		return waitDuration | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(MaxIdleClosed, func() float64 { | ||||
| 		updateFn() | ||||
| 		return maxIdleClosed | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(MaxIdletimeClosed, func() float64 { | ||||
| 		updateFn() | ||||
| 		return maxIdleTimeClosed | ||||
| 	}) | ||||
|  | ||||
| 	options.Meter.Gauge(MaxLifetimeClosed, func() float64 { | ||||
| 		updateFn() | ||||
| 		return maxLifetimeClosed | ||||
| 	}) | ||||
| } | ||||
|   | ||||
| @@ -59,6 +59,8 @@ type Meter interface { | ||||
| 	Options() Options | ||||
| 	// String return meter type | ||||
| 	String() string | ||||
| 	// Unregister metric name and drop all data | ||||
| 	Unregister(name string, labels ...string) bool | ||||
| } | ||||
|  | ||||
| // Counter is a counter | ||||
|   | ||||
| @@ -28,6 +28,10 @@ func (r *noopMeter) Name() string { | ||||
| 	return r.opts.Name | ||||
| } | ||||
|  | ||||
| func (r *noopMeter) Unregister(name string, labels ...string) bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| // Init initialize options | ||||
| func (r *noopMeter) Init(opts ...Option) error { | ||||
| 	for _, o := range opts { | ||||
|   | ||||
| @@ -6,18 +6,18 @@ import ( | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.unistack.org/micro/v4/meter" | ||||
| 	"go.unistack.org/micro/v4/semconv" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	pools   = make([]Statser, 0) | ||||
| 	poolsMu sync.Mutex | ||||
| ) | ||||
| func unregisterMetrics(size int) { | ||||
| 	meter.DefaultMeter.Unregister(semconv.PoolGetTotal, "capacity", strconv.Itoa(size)) | ||||
| 	meter.DefaultMeter.Unregister(semconv.PoolPutTotal, "capacity", strconv.Itoa(size)) | ||||
| 	meter.DefaultMeter.Unregister(semconv.PoolMisTotal, "capacity", strconv.Itoa(size)) | ||||
| 	meter.DefaultMeter.Unregister(semconv.PoolRetTotal, "capacity", strconv.Itoa(size)) | ||||
| } | ||||
|  | ||||
| // Stats struct | ||||
| type Stats struct { | ||||
| 	Get uint64 | ||||
| 	Put uint64 | ||||
| @@ -25,41 +25,13 @@ type Stats struct { | ||||
| 	Ret uint64 | ||||
| } | ||||
|  | ||||
| // Statser provides buffer pool stats | ||||
| type Statser interface { | ||||
| 	Stats() Stats | ||||
| 	Cap() int | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	go newStatsMeter() | ||||
| } | ||||
|  | ||||
| func newStatsMeter() { | ||||
| 	ticker := time.NewTicker(meter.DefaultMeterStatsInterval) | ||||
| 	defer ticker.Stop() | ||||
|  | ||||
| 	for range ticker.C { | ||||
| 		poolsMu.Lock() | ||||
| 		for _, st := range pools { | ||||
| 			stats := st.Stats() | ||||
| 			meter.DefaultMeter.Counter(semconv.PoolGetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Get) | ||||
| 			meter.DefaultMeter.Counter(semconv.PoolPutTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Put) | ||||
| 			meter.DefaultMeter.Counter(semconv.PoolMisTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Mis) | ||||
| 			meter.DefaultMeter.Counter(semconv.PoolRetTotal, "capacity", strconv.Itoa(st.Cap())).Set(stats.Ret) | ||||
| 		} | ||||
| 		poolsMu.Unlock() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	_ Statser = (*BytePool)(nil) | ||||
| 	_ Statser = (*BytesPool)(nil) | ||||
| 	_ Statser = (*StringsPool)(nil) | ||||
| ) | ||||
|  | ||||
| type Pool[T any] struct { | ||||
| 	p   *sync.Pool | ||||
| 	get *atomic.Uint64 | ||||
| 	put *atomic.Uint64 | ||||
| 	mis *atomic.Uint64 | ||||
| 	ret *atomic.Uint64 | ||||
| 	c   int | ||||
| } | ||||
|  | ||||
| func (p Pool[T]) Put(t T) { | ||||
| @@ -70,37 +42,82 @@ func (p Pool[T]) Get() T { | ||||
| 	return p.p.Get().(T) | ||||
| } | ||||
|  | ||||
| func NewPool[T any](fn func() T) Pool[T] { | ||||
| 	return Pool[T]{ | ||||
| 		p: &sync.Pool{ | ||||
| func NewPool[T any](fn func() T, size int) Pool[T] { | ||||
| 	p := Pool[T]{ | ||||
| 		c:   size, | ||||
| 		get: &atomic.Uint64{}, | ||||
| 		put: &atomic.Uint64{}, | ||||
| 		mis: &atomic.Uint64{}, | ||||
| 		ret: &atomic.Uint64{}, | ||||
| 	} | ||||
|  | ||||
| 	p.p = &sync.Pool{ | ||||
| 		New: func() interface{} { | ||||
| 			p.mis.Add(1) | ||||
| 			return fn() | ||||
| 		}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { | ||||
| 		return float64(p.get.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { | ||||
| 		return float64(p.put.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { | ||||
| 		return float64(p.mis.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { | ||||
| 		return float64(p.ret.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	return p | ||||
| } | ||||
|  | ||||
| type BytePool struct { | ||||
| 	p   *sync.Pool | ||||
| 	get uint64 | ||||
| 	put uint64 | ||||
| 	mis uint64 | ||||
| 	ret uint64 | ||||
| 	get *atomic.Uint64 | ||||
| 	put *atomic.Uint64 | ||||
| 	mis *atomic.Uint64 | ||||
| 	ret *atomic.Uint64 | ||||
| 	c   int | ||||
| } | ||||
|  | ||||
| func NewBytePool(size int) *BytePool { | ||||
| 	p := &BytePool{c: size} | ||||
| 	p := &BytePool{ | ||||
| 		c:   size, | ||||
| 		get: &atomic.Uint64{}, | ||||
| 		put: &atomic.Uint64{}, | ||||
| 		mis: &atomic.Uint64{}, | ||||
| 		ret: &atomic.Uint64{}, | ||||
| 	} | ||||
| 	p.p = &sync.Pool{ | ||||
| 		New: func() interface{} { | ||||
| 			atomic.AddUint64(&p.mis, 1) | ||||
| 			p.mis.Add(1) | ||||
| 			b := make([]byte, 0, size) | ||||
| 			return &b | ||||
| 		}, | ||||
| 	} | ||||
| 	poolsMu.Lock() | ||||
| 	pools = append(pools, p) | ||||
| 	poolsMu.Unlock() | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { | ||||
| 		return float64(p.get.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { | ||||
| 		return float64(p.put.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { | ||||
| 		return float64(p.mis.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { | ||||
| 		return float64(p.ret.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	return p | ||||
| } | ||||
|  | ||||
| @@ -110,49 +127,73 @@ func (p *BytePool) Cap() int { | ||||
|  | ||||
| func (p *BytePool) Stats() Stats { | ||||
| 	return Stats{ | ||||
| 		Put: atomic.LoadUint64(&p.put), | ||||
| 		Get: atomic.LoadUint64(&p.get), | ||||
| 		Mis: atomic.LoadUint64(&p.mis), | ||||
| 		Ret: atomic.LoadUint64(&p.ret), | ||||
| 		Put: p.put.Load(), | ||||
| 		Get: p.get.Load(), | ||||
| 		Mis: p.mis.Load(), | ||||
| 		Ret: p.ret.Load(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *BytePool) Get() *[]byte { | ||||
| 	atomic.AddUint64(&p.get, 1) | ||||
| 	p.get.Add(1) | ||||
| 	return p.p.Get().(*[]byte) | ||||
| } | ||||
|  | ||||
| func (p *BytePool) Put(b *[]byte) { | ||||
| 	atomic.AddUint64(&p.put, 1) | ||||
| 	p.put.Add(1) | ||||
| 	if cap(*b) > p.c { | ||||
| 		atomic.AddUint64(&p.ret, 1) | ||||
| 		p.ret.Add(1) | ||||
| 		return | ||||
| 	} | ||||
| 	*b = (*b)[:0] | ||||
| 	p.p.Put(b) | ||||
| } | ||||
|  | ||||
| func (p *BytePool) Close() { | ||||
| 	unregisterMetrics(p.c) | ||||
| } | ||||
|  | ||||
| type BytesPool struct { | ||||
| 	p   *sync.Pool | ||||
| 	get uint64 | ||||
| 	put uint64 | ||||
| 	mis uint64 | ||||
| 	ret uint64 | ||||
| 	get *atomic.Uint64 | ||||
| 	put *atomic.Uint64 | ||||
| 	mis *atomic.Uint64 | ||||
| 	ret *atomic.Uint64 | ||||
| 	c   int | ||||
| } | ||||
|  | ||||
| func NewBytesPool(size int) *BytesPool { | ||||
| 	p := &BytesPool{c: size} | ||||
| 	p := &BytesPool{ | ||||
| 		c:   size, | ||||
| 		get: &atomic.Uint64{}, | ||||
| 		put: &atomic.Uint64{}, | ||||
| 		mis: &atomic.Uint64{}, | ||||
| 		ret: &atomic.Uint64{}, | ||||
| 	} | ||||
| 	p.p = &sync.Pool{ | ||||
| 		New: func() interface{} { | ||||
| 			atomic.AddUint64(&p.mis, 1) | ||||
| 			p.mis.Add(1) | ||||
| 			b := bytes.NewBuffer(make([]byte, 0, size)) | ||||
| 			return b | ||||
| 		}, | ||||
| 	} | ||||
| 	poolsMu.Lock() | ||||
| 	pools = append(pools, p) | ||||
| 	poolsMu.Unlock() | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolGetTotal, func() float64 { | ||||
| 		return float64(p.get.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolPutTotal, func() float64 { | ||||
| 		return float64(p.put.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolMisTotal, func() float64 { | ||||
| 		return float64(p.mis.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	meter.DefaultMeter.Gauge(semconv.PoolRetTotal, func() float64 { | ||||
| 		return float64(p.ret.Load()) | ||||
| 	}, "capacity", strconv.Itoa(p.c)) | ||||
|  | ||||
| 	return p | ||||
| } | ||||
|  | ||||
| @@ -162,10 +203,10 @@ func (p *BytesPool) Cap() int { | ||||
|  | ||||
| func (p *BytesPool) Stats() Stats { | ||||
| 	return Stats{ | ||||
| 		Put: atomic.LoadUint64(&p.put), | ||||
| 		Get: atomic.LoadUint64(&p.get), | ||||
| 		Mis: atomic.LoadUint64(&p.mis), | ||||
| 		Ret: atomic.LoadUint64(&p.ret), | ||||
| 		Put: p.put.Load(), | ||||
| 		Get: p.get.Load(), | ||||
| 		Mis: p.mis.Load(), | ||||
| 		Ret: p.ret.Load(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -174,34 +215,43 @@ func (p *BytesPool) Get() *bytes.Buffer { | ||||
| } | ||||
|  | ||||
| func (p *BytesPool) Put(b *bytes.Buffer) { | ||||
| 	p.put.Add(1) | ||||
| 	if (*b).Cap() > p.c { | ||||
| 		atomic.AddUint64(&p.ret, 1) | ||||
| 		p.ret.Add(1) | ||||
| 		return | ||||
| 	} | ||||
| 	b.Reset() | ||||
| 	p.p.Put(b) | ||||
| } | ||||
|  | ||||
| func (p *BytesPool) Close() { | ||||
| 	unregisterMetrics(p.c) | ||||
| } | ||||
|  | ||||
| type StringsPool struct { | ||||
| 	p   *sync.Pool | ||||
| 	get uint64 | ||||
| 	put uint64 | ||||
| 	mis uint64 | ||||
| 	ret uint64 | ||||
| 	get *atomic.Uint64 | ||||
| 	put *atomic.Uint64 | ||||
| 	mis *atomic.Uint64 | ||||
| 	ret *atomic.Uint64 | ||||
| 	c   int | ||||
| } | ||||
|  | ||||
| func NewStringsPool(size int) *StringsPool { | ||||
| 	p := &StringsPool{c: size} | ||||
| 	p := &StringsPool{ | ||||
| 		c:   size, | ||||
| 		get: &atomic.Uint64{}, | ||||
| 		put: &atomic.Uint64{}, | ||||
| 		mis: &atomic.Uint64{}, | ||||
| 		ret: &atomic.Uint64{}, | ||||
| 	} | ||||
| 	p.p = &sync.Pool{ | ||||
| 		New: func() interface{} { | ||||
| 			atomic.AddUint64(&p.mis, 1) | ||||
| 			p.mis.Add(1) | ||||
| 			return &strings.Builder{} | ||||
| 		}, | ||||
| 	} | ||||
| 	poolsMu.Lock() | ||||
| 	pools = append(pools, p) | ||||
| 	poolsMu.Unlock() | ||||
|  | ||||
| 	return p | ||||
| } | ||||
|  | ||||
| @@ -211,24 +261,28 @@ func (p *StringsPool) Cap() int { | ||||
|  | ||||
| func (p *StringsPool) Stats() Stats { | ||||
| 	return Stats{ | ||||
| 		Put: atomic.LoadUint64(&p.put), | ||||
| 		Get: atomic.LoadUint64(&p.get), | ||||
| 		Mis: atomic.LoadUint64(&p.mis), | ||||
| 		Ret: atomic.LoadUint64(&p.ret), | ||||
| 		Put: p.put.Load(), | ||||
| 		Get: p.get.Load(), | ||||
| 		Mis: p.mis.Load(), | ||||
| 		Ret: p.ret.Load(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (p *StringsPool) Get() *strings.Builder { | ||||
| 	atomic.AddUint64(&p.get, 1) | ||||
| 	p.get.Add(1) | ||||
| 	return p.p.Get().(*strings.Builder) | ||||
| } | ||||
|  | ||||
| func (p *StringsPool) Put(b *strings.Builder) { | ||||
| 	atomic.AddUint64(&p.put, 1) | ||||
| 	p.put.Add(1) | ||||
| 	if b.Cap() > p.c { | ||||
| 		atomic.AddUint64(&p.ret, 1) | ||||
| 		p.ret.Add(1) | ||||
| 		return | ||||
| 	} | ||||
| 	b.Reset() | ||||
| 	p.p.Put(b) | ||||
| } | ||||
|  | ||||
| func (p *StringsPool) Close() { | ||||
| 	unregisterMetrics(p.c) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user