added function for histograms merge

This commit is contained in:
AndrewChubatiuk 2024-05-19 11:06:13 +03:00
parent b6cce23ebe
commit e40cfe3ab5
No known key found for this signature in database
GPG Key ID: 96D776CC99880667
2 changed files with 56 additions and 5 deletions

View File

@ -47,7 +47,7 @@ var bucketMultiplier = math.Pow(10, 1.0/bucketsPerDecimal)
// Zero histogram is usable. // Zero histogram is usable.
type Histogram struct { type Histogram struct {
// Mu gurantees synchronous update for all the counters and sum. // Mu gurantees synchronous update for all the counters and sum.
mu sync.Mutex mu sync.RWMutex
decimalBuckets [decimalBucketsCount]*[bucketsPerDecimal]uint64 decimalBuckets [decimalBucketsCount]*[bucketsPerDecimal]uint64
@ -109,6 +109,28 @@ func (h *Histogram) Update(v float64) {
h.mu.Unlock() h.mu.Unlock()
} }
// Merge merges histograms
func (h *Histogram) Merge(b *Histogram) {
h.mu.Lock()
defer h.mu.Unlock()
b.mu.RLock()
defer b.mu.RUnlock()
h.lower += b.lower
h.upper += b.upper
h.sum += b.sum
for i, db := range b.decimalBuckets {
if db == nil {
continue
}
for j := range db {
h.decimalBuckets[i][j] = db[j]
}
}
}
// VisitNonZeroBuckets calls f for all buckets with non-zero counters. // VisitNonZeroBuckets calls f for all buckets with non-zero counters.
// //
// vmrange contains "<start>...<end>" string with bucket bounds. The lower bound // vmrange contains "<start>...<end>" string with bucket bounds. The lower bound
@ -116,7 +138,7 @@ func (h *Histogram) Update(v float64) {
// This is required to be compatible with Prometheus-style histogram buckets // This is required to be compatible with Prometheus-style histogram buckets
// with `le` (less or equal) labels. // with `le` (less or equal) labels.
func (h *Histogram) VisitNonZeroBuckets(f func(vmrange string, count uint64)) { func (h *Histogram) VisitNonZeroBuckets(f func(vmrange string, count uint64)) {
h.mu.Lock() h.mu.RLock()
if h.lower > 0 { if h.lower > 0 {
f(lowerBucketRange, h.lower) f(lowerBucketRange, h.lower)
} }
@ -135,7 +157,7 @@ func (h *Histogram) VisitNonZeroBuckets(f func(vmrange string, count uint64)) {
if h.upper > 0 { if h.upper > 0 {
f(upperBucketRange, h.upper) f(upperBucketRange, h.upper)
} }
h.mu.Unlock() h.mu.RUnlock()
} }
// NewHistogram creates and returns new histogram with the given name. // NewHistogram creates and returns new histogram with the given name.
@ -223,9 +245,9 @@ func (h *Histogram) marshalTo(prefix string, w io.Writer) {
} }
func (h *Histogram) getSum() float64 { func (h *Histogram) getSum() float64 {
h.mu.Lock() h.mu.RLock()
sum := h.sum sum := h.sum
h.mu.Unlock() h.mu.RUnlock()
return sum return sum
} }

View File

@ -10,6 +10,35 @@ import (
"time" "time"
) )
func TestHistogramMerge(t *testing.T) {
name := `TestHistogramMerge`
h := NewHistogram(name)
// Write data to histogram
for i := 98; i < 218; i++ {
h.Update(float64(i))
}
b := NewHistogram("test")
for i := 98; i < 218; i++ {
h.Update(float64(i))
}
h.Merge(b)
// Make sure the histogram prints <prefix>_bucket on marshalTo call
testMarshalTo(t, h, "prefix", `prefix_bucket{vmrange="8.799e+01...1.000e+02"} 6
prefix_bucket{vmrange="1.000e+02...1.136e+02"} 26
prefix_bucket{vmrange="1.136e+02...1.292e+02"} 32
prefix_bucket{vmrange="1.292e+02...1.468e+02"} 34
prefix_bucket{vmrange="1.468e+02...1.668e+02"} 40
prefix_bucket{vmrange="1.668e+02...1.896e+02"} 46
prefix_bucket{vmrange="1.896e+02...2.154e+02"} 52
prefix_bucket{vmrange="2.154e+02...2.448e+02"} 4
prefix_sum 37800
prefix_count 240
`)
}
func TestGetVMRange(t *testing.T) { func TestGetVMRange(t *testing.T) {
f := func(bucketIdx int, vmrangeExpected string) { f := func(bucketIdx int, vmrangeExpected string) {
t.Helper() t.Helper()