add functionality for periodic pushing metrics to remote storage via InitPush*
This commit is contained in:
parent
7fa54d40f0
commit
6cf96d45a0
@ -16,6 +16,9 @@
|
||||
* Allows exporting distinct metric sets via distinct endpoints. See [Set](http://godoc.org/github.com/VictoriaMetrics/metrics#Set).
|
||||
* Supports [easy-to-use histograms](http://godoc.org/github.com/VictoriaMetrics/metrics#Histogram), which just work without any tuning.
|
||||
Read more about VictoriaMetrics histograms at [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350).
|
||||
* Can push metrics to VictoriaMetrics or to any other remote storage, which accepts metrics
|
||||
in [Prometheus text exposition format](https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format).
|
||||
See [these docs](http://godoc.org/github.com/VictoriaMetrics/metrics#InitPush).
|
||||
|
||||
|
||||
### Limitations
|
||||
@ -64,6 +67,10 @@ func requestHandler() {
|
||||
http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
|
||||
metrics.WritePrometheus(w, true)
|
||||
})
|
||||
|
||||
// ... or push registered metrics every 10 seconds to http://victoria-metrics:8428/api/v1/import/prometheus
|
||||
// with the added `instance="foobar"` label to all the pushed metrics.
|
||||
metrics.InitPush("http://victoria-metrics:8428/api/v1/import/prometheus", 10*time.Second, `instance="foobar"`, true)
|
||||
```
|
||||
|
||||
See [docs](http://godoc.org/github.com/VictoriaMetrics/metrics) for more info.
|
||||
|
141
push.go
Normal file
141
push.go
Normal file
@ -0,0 +1,141 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval.
|
||||
//
|
||||
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
|
||||
// to all the metrics before pushing them to pushURL.
|
||||
//
|
||||
// The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
//
|
||||
// It is OK calling InitPushProcessMetrics multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func InitPushProcessMetrics(pushURL string, interval time.Duration, extraLabels string) {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
WriteProcessMetrics(w)
|
||||
}
|
||||
initPush(pushURL, interval, extraLabels, writeMetrics)
|
||||
}
|
||||
|
||||
// InitPush sets up periodic push for globally registered metrics to the given pushURL with the given interval.
|
||||
//
|
||||
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
|
||||
// to all the metrics before pushing them to pushURL.
|
||||
//
|
||||
// If pushProcessMetrics is set to true, then 'process_*' metrics are also pushed to pushURL.
|
||||
//
|
||||
// The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
//
|
||||
// It is OK calling InitPush multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func InitPush(pushURL string, interval time.Duration, extraLabels string, pushProcessMetrics bool) {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
defaultSet.WritePrometheus(w)
|
||||
if pushProcessMetrics {
|
||||
WriteProcessMetrics(w)
|
||||
}
|
||||
}
|
||||
initPush(pushURL, interval, extraLabels, writeMetrics)
|
||||
}
|
||||
|
||||
// InitPush sets up periodic push for metrics from s to the given pushURL with the given interval.
|
||||
//
|
||||
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
|
||||
// to all the metrics before pushing them to pushURL.
|
||||
//
|
||||
/// The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
//
|
||||
// It is OK calling InitPush multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels string) {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
s.WritePrometheus(w)
|
||||
}
|
||||
initPush(pushURL, interval, extraLabels, writeMetrics)
|
||||
}
|
||||
|
||||
func initPush(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) {
|
||||
if interval <= 0 {
|
||||
panic(fmt.Errorf("BUG: interval must be positive; got %s", interval))
|
||||
}
|
||||
if err := validateTags(extraLabels); err != nil {
|
||||
panic(fmt.Errorf("BUG: invalid extraLabels=%q: %s", extraLabels, err))
|
||||
}
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
var bb bytes.Buffer
|
||||
var tmpBuf []byte
|
||||
for range ticker.C {
|
||||
bb.Reset()
|
||||
writeMetrics(&bb)
|
||||
if len(extraLabels) > 0 {
|
||||
tmpBuf = addExtraLabels(tmpBuf[:0], bb.Bytes(), extraLabels)
|
||||
bb.Reset()
|
||||
bb.Write(tmpBuf)
|
||||
}
|
||||
resp, err := http.Post(pushURL, "text/plain", &bb)
|
||||
if err != nil {
|
||||
log.Printf("cannot push metrics to %q: %s", pushURL, err)
|
||||
continue
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
log.Printf("unexpected status code in response from %q: %d; expecting 2xx", pushURL, resp.StatusCode)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func addExtraLabels(dst, src []byte, extraLabels string) []byte {
|
||||
for len(src) > 0 {
|
||||
var line []byte
|
||||
n := bytes.IndexByte(src, '\n')
|
||||
if n >= 0 {
|
||||
line = src[:n]
|
||||
src = src[n+1:]
|
||||
} else {
|
||||
line = src
|
||||
src = nil
|
||||
}
|
||||
n = bytes.IndexByte(line, '{')
|
||||
if n >= 0 {
|
||||
dst = append(dst, line[:n+1]...)
|
||||
dst = append(dst, extraLabels...)
|
||||
dst = append(dst, ',')
|
||||
dst = append(dst, line[n+1:]...)
|
||||
} else {
|
||||
n = bytes.LastIndexByte(line, ' ')
|
||||
if n < 0 {
|
||||
panic(fmt.Errorf("BUG: missing whitespace in the generated Prometheus text exposition line %q", line))
|
||||
}
|
||||
dst = append(dst, line[:n]...)
|
||||
dst = append(dst, '{')
|
||||
dst = append(dst, extraLabels...)
|
||||
dst = append(dst, '}')
|
||||
dst = append(dst, line[n:]...)
|
||||
}
|
||||
dst = append(dst, '\n')
|
||||
}
|
||||
return dst
|
||||
}
|
49
push_test.go
Normal file
49
push_test.go
Normal file
@ -0,0 +1,49 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestAddExtraLabels(t *testing.T) {
|
||||
f := func(s, extraLabels, expectedResult string) {
|
||||
t.Helper()
|
||||
result := addExtraLabels(nil, []byte(s), extraLabels)
|
||||
if string(result) != expectedResult {
|
||||
t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, expectedResult)
|
||||
}
|
||||
}
|
||||
f("", `foo="bar"`, "")
|
||||
f("a 123", `foo="bar"`, `a{foo="bar"} 123`+"\n")
|
||||
f(`a{b="c"} 1.3`, `foo="bar"`, `a{foo="bar",b="c"} 1.3`+"\n")
|
||||
f(`a{b="c}{"} 1.3`, `foo="bar",baz="x"`, `a{foo="bar",baz="x",b="c}{"} 1.3`+"\n")
|
||||
f(`foo 1
|
||||
bar{a="x"} 2
|
||||
`, `foo="bar"`, `foo{foo="bar"} 1
|
||||
bar{foo="bar",a="x"} 2
|
||||
`)
|
||||
}
|
||||
|
||||
func TestInitPushFailure(t *testing.T) {
|
||||
f := func(interval time.Duration, extraLabels string) {
|
||||
t.Helper()
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
panic("expecting non-nil error")
|
||||
}
|
||||
}()
|
||||
InitPush("http://foobar", interval, extraLabels, false)
|
||||
}
|
||||
|
||||
// Non-positive interval
|
||||
f(0, "")
|
||||
|
||||
// Invalid extraLabels
|
||||
f(time.Second, "foo")
|
||||
f(time.Second, "foo{bar")
|
||||
f(time.Second, "foo=bar")
|
||||
f(time.Second, "foo='bar'")
|
||||
f(time.Second, `foo="bar",baz`)
|
||||
f(time.Second, `{foo="bar"}`)
|
||||
f(time.Second, `a{foo="bar"}`)
|
||||
}
|
Loading…
Reference in New Issue
Block a user