diff --git a/histogram.go b/histogram.go new file mode 100644 index 0000000..48056ec --- /dev/null +++ b/histogram.go @@ -0,0 +1,206 @@ +package metrics + +import ( + "fmt" + "io" + "math" + "sync" + "sync/atomic" + "time" +) + +// Histogram is a histogram that covers values with the following buckets: +// +// 0 +// (0...1e-9] +// (1e-9...2e-9] +// (2e-9...3e-9] +// ... +// (9e-9...1e-8] +// (1e-8...2e-8] +// ... +// (1e11...2e11] +// (2e11...3e11] +// ... +// (9e11...1e12] +// (1e12...Inf] +// +// Each bucket contains a counter for values in the given range. +// Each non-zero bucket is exposed with the following name: +// +// _vmbucket{,vmrange="..."} +// +// Where: +// +// - is the metric name passed to NewHistogram +// - is optional tags for the , which are passed to NewHistogram +// - and - start and end values for the given bucket +// - - the number of hits to the given bucket during Update* calls. +// +// Only non-zero buckets are exposed. +// +// Histogram buckets can be converted to Prometheus-like buckets in VictoriaMetrics +// with `prometheus_buckets(_vmbucket)`: +// +// histogram_quantile(0.95, prometheus_buckets(rate(request_duration_vmbucket[5m]))) +// +// Histogram cannot be used for negative values. +type Histogram struct { + buckets [bucketsCount]uint64 + + sumMu sync.Mutex + sum float64 + count uint64 +} + +// NewHistogram creates and returns new histogram with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// * foo +// * foo{bar="baz"} +// * foo{bar="baz",aaa="b"} +// +// The returned histogram is safe to use from concurrent goroutines. +func NewHistogram(name string) *Histogram { + return defaultSet.NewHistogram(name) +} + +// GetOrCreateHistogram returns registered histogram with the given name +// or creates new histogram if the registry doesn't contain histogram with +// the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// * foo +// * foo{bar="baz"} +// * foo{bar="baz",aaa="b"} +// +// The returned histogram is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewHistogram instead of GetOrCreateHistogram. +func GetOrCreateHistogram(name string) *Histogram { + return defaultSet.GetOrCreateHistogram(name) +} + +// Update updates h with v. +// +// v cannot be negative. +func (h *Histogram) Update(v float64) { + idx := getBucketIdx(v) + if idx >= uint(len(h.buckets)) { + panic(fmt.Errorf("BUG: idx cannot exceed %d; got %d", len(h.buckets), idx)) + } + atomic.AddUint64(&h.buckets[idx], 1) + atomic.AddUint64(&h.count, 1) + h.sumMu.Lock() + h.sum += v + h.sumMu.Unlock() +} + +// UpdateDuration updates request duration based on the given startTime. +func (h *Histogram) UpdateDuration(startTime time.Time) { + d := time.Since(startTime).Seconds() + h.Update(d) +} + +func (h *Histogram) marshalTo(prefix string, w io.Writer) { + count := atomic.LoadUint64(&h.count) + if count == 0 { + return + } + for i := range h.buckets[:] { + h.marshalBucket(prefix, w, i) + } + // Marshal `_sum` and `_count` metrics. + name, filters := splitMetricName(prefix) + h.sumMu.Lock() + sum := h.sum + h.sumMu.Unlock() + if float64(int64(sum)) == sum { + fmt.Fprintf(w, "%s_sum%s %d\n", name, filters, int64(sum)) + } else { + fmt.Fprintf(w, "%s_sum%s %g\n", name, filters, sum) + } + fmt.Fprintf(w, "%s_count%s %d\n", name, filters, count) +} + +func (h *Histogram) marshalBucket(prefix string, w io.Writer, idx int) { + v := h.buckets[idx] + if v == 0 { + return + } + start := float64(0) + if idx > 0 { + start = getRangeEndFromBucketIdx(uint(idx - 1)) + } + end := getRangeEndFromBucketIdx(uint(idx)) + tag := fmt.Sprintf(`vmrange="%g...%g"`, start, end) + prefix = addTag(prefix, tag) + name, filters := splitMetricName(prefix) + fmt.Fprintf(w, "%s_vmbucket%s %d\n", name, filters, v) +} + +func getBucketIdx(v float64) uint { + if v < 0 { + panic(fmt.Errorf("BUG: v cannot be negative; got %v", v)) + } + if v == 0 { + // Fast path for zero. + return 0 + } + if math.IsInf(v, 1) { + return bucketsCount - 1 + } + e10 := int(math.Floor(math.Log10(v))) + if e10 < e10Min { + return 1 + } + if e10 > e10Max { + if e10 == e10Max+1 && math.Pow10(e10) == v { + // Adjust m to be on par with Prometheus 'le' buckets (aka 'less or equal') + return bucketsCount - 2 + } + return bucketsCount - 1 + } + mf := v / math.Pow10(e10) + m := uint(mf) + // Handle possible rounding errors + if m < 1 { + m = 1 + } else if m > 9 { + m = 9 + } + if float64(m) == mf { + // Adjust m to be on par with Prometheus 'le' buckets (aka 'less or equal') + m-- + } + return 1 + m + uint(e10-e10Min)*9 +} + +func getRangeEndFromBucketIdx(idx uint) float64 { + if idx == 0 { + return 0 + } + if idx == 1 { + return math.Pow10(e10Min) + } + if idx >= bucketsCount-1 { + return math.Inf(1) + } + idx -= 2 + e10 := e10Min + int(idx/9) + m := 2 + (idx % 9) + return math.Pow10(e10) * float64(m) +} + +// Each range (10^n..10^(n+1)] for e10Min<=n<=e10Max is split into 9 equal sub-ranges, plus 3 additional buckets: +// - a bucket for zeros +// - a bucket for the range (0..10^e10Min] +// - a bucket for the range (10^(e10Max+1)..Inf] +const bucketsCount = 3 + 9*(1+e10Max-e10Min) + +const e10Min = -9 +const e10Max = 11 diff --git a/histogram_test.go b/histogram_test.go new file mode 100644 index 0000000..a4581ff --- /dev/null +++ b/histogram_test.go @@ -0,0 +1,194 @@ +package metrics + +import ( + "bytes" + "fmt" + "math" + "strings" + "testing" + "time" +) + +func TestHistogramUpdateNegativeValue(t *testing.T) { + h := NewHistogram("TestHisogramUpdateNegativeValue") + expectPanic(t, "negative value", func() { + h.Update(-123) + }) +} + +func TestGetBucketIdx(t *testing.T) { + f := func(v float64, idxExpected uint) { + t.Helper() + idx := getBucketIdx(v) + if idx != idxExpected { + t.Fatalf("unexpected getBucketIdx(%g); got %d; want %d", v, idx, idxExpected) + } + } + f(0, 0) + f(math.Pow10(e10Min-10), 1) + f(math.Pow10(e10Min-1), 1) + f(1.5*math.Pow10(e10Min-1), 1) + f(2*math.Pow10(e10Min-1), 1) + f(3*math.Pow10(e10Min-1), 1) + f(9*math.Pow10(e10Min-1), 1) + f(9.999*math.Pow10(e10Min-1), 1) + f(math.Pow10(e10Min), 1) + f(1.00001*math.Pow10(e10Min), 2) + f(1.5*math.Pow10(e10Min), 2) + f(1.999999*math.Pow10(e10Min), 2) + f(2*math.Pow10(e10Min), 2) + f(2.0000001*math.Pow10(e10Min), 3) + f(2.999*math.Pow10(e10Min), 3) + f(8.999*math.Pow10(e10Min), 9) + f(9*math.Pow10(e10Min), 9) + f(9.01*math.Pow10(e10Min), 10) + f(9.99999*math.Pow10(e10Min), 10) + f(math.Pow10(e10Min+1), 10) + f(1.9*math.Pow10(e10Min+1), 11) + f(9.9*math.Pow10(e10Min+1), 19) + f(math.Pow10(e10Min+2), 19) + f(math.Pow10(e10Min+3), 28) + f(5*math.Pow10(e10Min+3), 32) + f(0.1, 1-9*(e10Min+1)) + f(0.11, 2-9*(e10Min+1)) + f(0.95, 1-9*e10Min) + f(1, 1-9*e10Min) + f(2, 2-9*e10Min) + f(math.Pow10(e10Max), 1+9*(e10Max-e10Min)) + f(2*math.Pow10(e10Max), 2+9*(e10Max-e10Min)) + f(9.999*math.Pow10(e10Max), 10+9*(e10Max-e10Min)) + f(math.Pow10(e10Max+1), 10+9*(e10Max-e10Min)) + f(2*math.Pow10(e10Max+1), 11+9*(e10Max-e10Min)) + f(9*math.Pow10(e10Max+1), 11+9*(e10Max-e10Min)) + f(math.Pow10(e10Max+5), 11+9*(e10Max-e10Min)) + f(12.34*math.Pow10(e10Max+7), 11+9*(e10Max-e10Min)) + f(math.Inf(1), 11+9*(e10Max-e10Min)) +} + +func TestGetRangeEndFromBucketIdx(t *testing.T) { + f := func(idx uint, endExpected float64) { + t.Helper() + end := getRangeEndFromBucketIdx(idx) + if end != endExpected { + t.Fatalf("unexpected getRangeEndFromBucketIdx(%d); got %g; want %g", idx, end, endExpected) + } + } + f(0, 0) + f(1, math.Pow10(e10Min)) + f(2, 2*math.Pow10(e10Min)) + f(3, 3*math.Pow10(e10Min)) + f(9, 9*math.Pow10(e10Min)) + f(10, math.Pow10(e10Min+1)) + f(11, 2*math.Pow10(e10Min+1)) + f(16, 7*math.Pow10(e10Min+1)) + f(17, 8*math.Pow10(e10Min+1)) + f(18, 9*math.Pow10(e10Min+1)) + f(19, math.Pow10(e10Min+2)) + f(20, 2*math.Pow10(e10Min+2)) + f(21, 3*math.Pow10(e10Min+2)) + f(bucketsCount-21, 9*math.Pow10(e10Max-2)) + f(bucketsCount-20, math.Pow10(e10Max-1)) + f(bucketsCount-16, 5*math.Pow10(e10Max-1)) + f(bucketsCount-13, 8*math.Pow10(e10Max-1)) + f(bucketsCount-12, 9*math.Pow10(e10Max-1)) + f(bucketsCount-11, math.Pow10(e10Max)) + f(bucketsCount-10, 2*math.Pow10(e10Max)) + f(bucketsCount-4, 8*math.Pow10(e10Max)) + f(bucketsCount-3, 9*math.Pow10(e10Max)) + f(bucketsCount-2, math.Pow10(e10Max+1)) + f(bucketsCount-1, math.Inf(1)) +} + +func TestHistogramSerial(t *testing.T) { + name := `TestHistogramSerial` + h := NewHistogram(name) + + // Verify that the histogram is invisible in the output of WritePrometheus when it has no data. + var bb bytes.Buffer + WritePrometheus(&bb, false) + result := bb.String() + if strings.Contains(result, name) { + t.Fatalf("histogram %s shouldn't be visible in the WritePrometheus output; got\n%s", name, result) + } + + // Write data to histogram + for i := 84; i < 324; i++ { + h.Update(float64(i)) + } + + // Make sure the histogram prints _xbucket on marshalTo call + testMarshalTo(t, h, "prefix", "prefix_vmbucket{vmrange=\"80...90\"} 7\nprefix_vmbucket{vmrange=\"90...100\"} 10\nprefix_vmbucket{vmrange=\"100...200\"} 100\nprefix_vmbucket{vmrange=\"200...300\"} 100\nprefix_vmbucket{vmrange=\"300...400\"} 23\nprefix_sum 48840\nprefix_count 240\n") + testMarshalTo(t, h, ` m{foo="bar"}`, "\t m_vmbucket{foo=\"bar\",vmrange=\"80...90\"} 7\n\t m_vmbucket{foo=\"bar\",vmrange=\"90...100\"} 10\n\t m_vmbucket{foo=\"bar\",vmrange=\"100...200\"} 100\n\t m_vmbucket{foo=\"bar\",vmrange=\"200...300\"} 100\n\t m_vmbucket{foo=\"bar\",vmrange=\"300...400\"} 23\n\t m_sum{foo=\"bar\"} 48840\n\t m_count{foo=\"bar\"} 240\n") + + // Verify supported ranges + for i := -100; i < 100; i++ { + h.Update(1.23 * math.Pow10(i)) + } + h.UpdateDuration(time.Now().Add(-time.Minute)) + + // Make sure the histogram becomes visible in the output of WritePrometheus, + // since now it contains values. + bb.Reset() + WritePrometheus(&bb, false) + result = bb.String() + if !strings.Contains(result, name) { + t.Fatalf("missing histogram %s in the WritePrometheus output; got\n%s", name, result) + } +} + +func TestHistogramConcurrent(t *testing.T) { + name := "HistogramConcurrent" + h := NewHistogram(name) + err := testConcurrent(func() error { + for i := 0; i < 10; i++ { + h.Update(float64(i)) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + testMarshalTo(t, h, "prefix", "prefix_vmbucket{vmrange=\"0...0\"} 5\nprefix_vmbucket{vmrange=\"0.9...1\"} 5\nprefix_vmbucket{vmrange=\"1...2\"} 5\nprefix_vmbucket{vmrange=\"2...3\"} 5\nprefix_vmbucket{vmrange=\"3...4\"} 5\nprefix_vmbucket{vmrange=\"4...5\"} 5\nprefix_vmbucket{vmrange=\"5...6\"} 5\nprefix_vmbucket{vmrange=\"6...7\"} 5\nprefix_vmbucket{vmrange=\"7...8\"} 5\nprefix_vmbucket{vmrange=\"8...9\"} 5\nprefix_sum 225\nprefix_count 50\n") +} + +func TestHistogramWithTags(t *testing.T) { + name := `TestHistogram{tag="foo"}` + h := NewHistogram(name) + h.Update(123) + + var bb bytes.Buffer + WritePrometheus(&bb, false) + result := bb.String() + namePrefixWithTag := `TestHistogram_vmbucket{tag="foo",vmrange="100...200"} 1` + "\n" + if !strings.Contains(result, namePrefixWithTag) { + t.Fatalf("missing histogram %s in the WritePrometheus output; got\n%s", namePrefixWithTag, result) + } +} + +func TestGetOrCreateHistogramSerial(t *testing.T) { + name := "GetOrCreateHistogramSerial" + if err := testGetOrCreateHistogram(name); err != nil { + t.Fatal(err) + } +} + +func TestGetOrCreateHistogramConcurrent(t *testing.T) { + name := "GetOrCreateHistogramConcurrent" + err := testConcurrent(func() error { + return testGetOrCreateHistogram(name) + }) + if err != nil { + t.Fatal(err) + } +} + +func testGetOrCreateHistogram(name string) error { + h1 := GetOrCreateHistogram(name) + for i := 0; i < 10; i++ { + h2 := GetOrCreateHistogram(name) + if h1 != h2 { + return fmt.Errorf("unexpected histogram returned; got %p; want %p", h2, h1) + } + } + return nil +} diff --git a/histogram_timing_test.go b/histogram_timing_test.go new file mode 100644 index 0000000..cee323f --- /dev/null +++ b/histogram_timing_test.go @@ -0,0 +1,17 @@ +package metrics + +import ( + "testing" +) + +func BenchmarkHistogramUpdate(b *testing.B) { + h := GetOrCreateHistogram("BenchmarkHistogramUpdate") + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + h.Update(float64(i)) + i++ + } + }) +} diff --git a/metrics_test.go b/metrics_test.go index 8b00bea..7d30b01 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -16,6 +16,7 @@ func TestInvalidName(t *testing.T) { expectPanic(t, fmt.Sprintf("GetOrCreateCounter(%q)", name), func() { GetOrCreateCounter(name) }) expectPanic(t, fmt.Sprintf("GetOrCreateGauge(%q)", name), func() { GetOrCreateGauge(name, func() float64 { return 0 }) }) expectPanic(t, fmt.Sprintf("GetOrCreateSummary(%q)", name), func() { GetOrCreateSummary(name) }) + expectPanic(t, fmt.Sprintf("GetOrCreateHistogram(%q)", name), func() { GetOrCreateHistogram(name) }) } f("") f("foo{") @@ -45,6 +46,11 @@ func TestDoubleRegister(t *testing.T) { NewSummary(name) expectPanic(t, name, func() { NewSummary(name) }) }) + t.Run("NewHistogram", func(t *testing.T) { + name := "NewHistogramDoubleRegister" + NewHistogram(name) + expectPanic(t, name, func() { NewSummary(name) }) + }) } func TestGetOrCreateNotCounter(t *testing.T) { @@ -65,6 +71,12 @@ func TestGetOrCreateNotSummary(t *testing.T) { expectPanic(t, name, func() { GetOrCreateSummary(name) }) } +func TestGetOrCreateNotHistogram(t *testing.T) { + name := "GetOrCreateNotHistogram" + NewCounter(name) + expectPanic(t, name, func() { GetOrCreateHistogram(name) }) +} + func TestWritePrometheusSerial(t *testing.T) { if err := testWritePrometheus(); err != nil { t.Fatal(err) diff --git a/set.go b/set.go index e520656..fad7b09 100644 --- a/set.go +++ b/set.go @@ -45,6 +45,64 @@ func (s *Set) WritePrometheus(w io.Writer) { s.mu.Unlock() } +// NewHistogram creates and returns new histogram in s with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// * foo +// * foo{bar="baz"} +// * foo{bar="baz",aaa="b"} +// +// The returned histogram is safe to use from concurrent goroutines. +func (s *Set) NewHistogram(name string) *Histogram { + h := &Histogram{} + s.registerMetric(name, h) + return h +} + +// GetOrCreateHistogram returns registered histogram in s with the given name +// or creates new histogram if s doesn't contain histogram with the given name. +// +// name must be valid Prometheus-compatible metric with possible labels. +// For instance, +// +// * foo +// * foo{bar="baz"} +// * foo{bar="baz",aaa="b"} +// +// The returned histogram is safe to use from concurrent goroutines. +// +// Performance tip: prefer NewHistogram instead of GetOrCreateHistogram. +func (s *Set) GetOrCreateHistogram(name string) *Histogram { + s.mu.Lock() + nm := s.m[name] + s.mu.Unlock() + if nm == nil { + // Slow path - create and register missing histogram. + if err := validateMetric(name); err != nil { + panic(fmt.Errorf("BUG: invalid metric name %q: %s", name, err)) + } + nmNew := &namedMetric{ + name: name, + metric: &Histogram{}, + } + s.mu.Lock() + nm = s.m[name] + if nm == nil { + nm = nmNew + s.m[name] = nm + s.a = append(s.a, nm) + } + s.mu.Unlock() + } + h, ok := nm.metric.(*Histogram) + if !ok { + panic(fmt.Errorf("BUG: metric %q isn't a Histogram. It is %T", name, nm.metric)) + } + return h +} + // NewCounter registers and returns new counter with the given name in the s. // // name must be valid Prometheus-compatible metric with possible labels. diff --git a/set_test.go b/set_test.go index 7e9fda9..3187e4c 100644 --- a/set_test.go +++ b/set_test.go @@ -27,6 +27,10 @@ func TestNewSet(t *testing.T) { if sm == nil { t.Fatalf("NewSummary returned nil") } + h := s.NewHistogram(fmt.Sprintf("histogram_%d", j)) + if h == nil { + t.Fatalf("NewHistogram returned nil") + } } } }