From 6cf96d45a0caa6a4a99fb3c5481ed7fd6f198fb9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 20 Jul 2022 16:53:12 +0300 Subject: [PATCH] add functionality for periodic pushing metrics to remote storage via InitPush* --- README.md | 7 +++ push.go | 141 +++++++++++++++++++++++++++++++++++++++++++++++++++ push_test.go | 49 ++++++++++++++++++ 3 files changed, 197 insertions(+) create mode 100644 push.go create mode 100644 push_test.go diff --git a/README.md b/README.md index a5d7e6e..e1a2537 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,9 @@ * Allows exporting distinct metric sets via distinct endpoints. See [Set](http://godoc.org/github.com/VictoriaMetrics/metrics#Set). * Supports [easy-to-use histograms](http://godoc.org/github.com/VictoriaMetrics/metrics#Histogram), which just work without any tuning. Read more about VictoriaMetrics histograms at [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). +* Can push metrics to VictoriaMetrics or to any other remote storage, which accepts metrics + in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format). + See [these docs](http://godoc.org/github.com/VictoriaMetrics/metrics#InitPush). ### Limitations @@ -64,6 +67,10 @@ func requestHandler() { http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) { metrics.WritePrometheus(w, true) }) + +// ... or push registered metrics every 10 seconds to http://victoria-metrics:8428/api/v1/import/prometheus +// with the added `instance="foobar"` label to all the pushed metrics. +metrics.InitPush("http://victoria-metrics:8428/api/v1/import/prometheus", 10*time.Second, `instance="foobar"`, true) ``` See [docs](http://godoc.org/github.com/VictoriaMetrics/metrics) for more info. diff --git a/push.go b/push.go new file mode 100644 index 0000000..121971d --- /dev/null +++ b/push.go @@ -0,0 +1,141 @@ +package metrics + +import ( + "bytes" + "fmt" + "io" + "log" + "net/http" + "time" +) + +// InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval. +// +// extraLabels may contain comma-separated list of `label="value"` labels, which will be added +// to all the metrics before pushing them to pushURL. +// +// The metrics are pushed to pushURL in Prometheus text exposition format. +// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +// +// It is OK calling InitPushProcessMetrics multiple times with different pushURL - +// in this case metrics are pushed to all the provided pushURL urls. +func InitPushProcessMetrics(pushURL string, interval time.Duration, extraLabels string) { + writeMetrics := func(w io.Writer) { + WriteProcessMetrics(w) + } + initPush(pushURL, interval, extraLabels, writeMetrics) +} + +// InitPush sets up periodic push for globally registered metrics to the given pushURL with the given interval. +// +// extraLabels may contain comma-separated list of `label="value"` labels, which will be added +// to all the metrics before pushing them to pushURL. +// +// If pushProcessMetrics is set to true, then 'process_*' metrics are also pushed to pushURL. +// +// The metrics are pushed to pushURL in Prometheus text exposition format. +// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +// +// It is OK calling InitPush multiple times with different pushURL - +// in this case metrics are pushed to all the provided pushURL urls. +func InitPush(pushURL string, interval time.Duration, extraLabels string, pushProcessMetrics bool) { + writeMetrics := func(w io.Writer) { + defaultSet.WritePrometheus(w) + if pushProcessMetrics { + WriteProcessMetrics(w) + } + } + initPush(pushURL, interval, extraLabels, writeMetrics) +} + +// InitPush sets up periodic push for metrics from s to the given pushURL with the given interval. +// +// extraLabels may contain comma-separated list of `label="value"` labels, which will be added +// to all the metrics before pushing them to pushURL. +// +/// The metrics are pushed to pushURL in Prometheus text exposition format. +// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +// +// It is OK calling InitPush multiple times with different pushURL - +// in this case metrics are pushed to all the provided pushURL urls. +func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels string) { + writeMetrics := func(w io.Writer) { + s.WritePrometheus(w) + } + initPush(pushURL, interval, extraLabels, writeMetrics) +} + +func initPush(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) { + if interval <= 0 { + panic(fmt.Errorf("BUG: interval must be positive; got %s", interval)) + } + if err := validateTags(extraLabels); err != nil { + panic(fmt.Errorf("BUG: invalid extraLabels=%q: %s", extraLabels, err)) + } + go func() { + ticker := time.NewTicker(interval) + var bb bytes.Buffer + var tmpBuf []byte + for range ticker.C { + bb.Reset() + writeMetrics(&bb) + if len(extraLabels) > 0 { + tmpBuf = addExtraLabels(tmpBuf[:0], bb.Bytes(), extraLabels) + bb.Reset() + bb.Write(tmpBuf) + } + resp, err := http.Post(pushURL, "text/plain", &bb) + if err != nil { + log.Printf("cannot push metrics to %q: %s", pushURL, err) + continue + } + _ = resp.Body.Close() + if resp.StatusCode/100 != 2 { + log.Printf("unexpected status code in response from %q: %d; expecting 2xx", pushURL, resp.StatusCode) + continue + } + } + }() +} + +func addExtraLabels(dst, src []byte, extraLabels string) []byte { + for len(src) > 0 { + var line []byte + n := bytes.IndexByte(src, '\n') + if n >= 0 { + line = src[:n] + src = src[n+1:] + } else { + line = src + src = nil + } + n = bytes.IndexByte(line, '{') + if n >= 0 { + dst = append(dst, line[:n+1]...) + dst = append(dst, extraLabels...) + dst = append(dst, ',') + dst = append(dst, line[n+1:]...) + } else { + n = bytes.LastIndexByte(line, ' ') + if n < 0 { + panic(fmt.Errorf("BUG: missing whitespace in the generated Prometheus text exposition line %q", line)) + } + dst = append(dst, line[:n]...) + dst = append(dst, '{') + dst = append(dst, extraLabels...) + dst = append(dst, '}') + dst = append(dst, line[n:]...) + } + dst = append(dst, '\n') + } + return dst +} diff --git a/push_test.go b/push_test.go new file mode 100644 index 0000000..28a9c74 --- /dev/null +++ b/push_test.go @@ -0,0 +1,49 @@ +package metrics + +import ( + "testing" + "time" +) + +func TestAddExtraLabels(t *testing.T) { + f := func(s, extraLabels, expectedResult string) { + t.Helper() + result := addExtraLabels(nil, []byte(s), extraLabels) + if string(result) != expectedResult { + t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, expectedResult) + } + } + f("", `foo="bar"`, "") + f("a 123", `foo="bar"`, `a{foo="bar"} 123`+"\n") + f(`a{b="c"} 1.3`, `foo="bar"`, `a{foo="bar",b="c"} 1.3`+"\n") + f(`a{b="c}{"} 1.3`, `foo="bar",baz="x"`, `a{foo="bar",baz="x",b="c}{"} 1.3`+"\n") + f(`foo 1 +bar{a="x"} 2 +`, `foo="bar"`, `foo{foo="bar"} 1 +bar{foo="bar",a="x"} 2 +`) +} + +func TestInitPushFailure(t *testing.T) { + f := func(interval time.Duration, extraLabels string) { + t.Helper() + defer func() { + if err := recover(); err == nil { + panic("expecting non-nil error") + } + }() + InitPush("http://foobar", interval, extraLabels, false) + } + + // Non-positive interval + f(0, "") + + // Invalid extraLabels + f(time.Second, "foo") + f(time.Second, "foo{bar") + f(time.Second, "foo=bar") + f(time.Second, "foo='bar'") + f(time.Second, `foo="bar",baz`) + f(time.Second, `{foo="bar"}`) + f(time.Second, `a{foo="bar"}`) +}