support push config for InitPush (#53)
PushConfig represents a struct package uses for pushing metrics to remote destination. Having a structure helps to extend functionality in the future, without touching the signature of existing functions. For example, `PushConfig` supports custom HTTP headers via `Headers` param. Updates https://github.com/VictoriaMetrics/metrics/issues/52 Updates https://github.com/VictoriaMetrics/metrics/issues/36 Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
2ec14979a8
commit
42c28a8486
@ -71,6 +71,9 @@ http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
|
|||||||
// ... or push registered metrics every 10 seconds to http://victoria-metrics:8428/api/v1/import/prometheus
|
// ... or push registered metrics every 10 seconds to http://victoria-metrics:8428/api/v1/import/prometheus
|
||||||
// with the added `instance="foobar"` label to all the pushed metrics.
|
// with the added `instance="foobar"` label to all the pushed metrics.
|
||||||
metrics.InitPush("http://victoria-metrics:8428/api/v1/import/prometheus", 10*time.Second, `instance="foobar"`, true)
|
metrics.InitPush("http://victoria-metrics:8428/api/v1/import/prometheus", 10*time.Second, `instance="foobar"`, true)
|
||||||
|
|
||||||
|
// ... or use metrics.PushConfig to see full list of configuration params
|
||||||
|
// metrics.InitPushWithConfig(metrics.PushConfig{URL: "http://victoria-metrics:8428/api/v1/import/prometheus", Interval: 10*time.Second})
|
||||||
```
|
```
|
||||||
|
|
||||||
See [docs](http://godoc.org/github.com/VictoriaMetrics/metrics) for more info.
|
See [docs](http://godoc.org/github.com/VictoriaMetrics/metrics) for more info.
|
||||||
|
52
config.go
Normal file
52
config.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PushConfig is config for pushing registered metrics to the given URL with the given Interval.
|
||||||
|
//
|
||||||
|
// URL and Interval are required fields
|
||||||
|
type PushConfig struct {
|
||||||
|
// URL defines URL where metrics would be pushed.
|
||||||
|
URL string
|
||||||
|
// Interval determines the frequency of pushing metrics.
|
||||||
|
Interval time.Duration
|
||||||
|
|
||||||
|
// Headers contain optional http request Headers
|
||||||
|
Headers http.Header
|
||||||
|
// ExtraLabels may contain comma-separated list of `label="value"` labels, which will be added
|
||||||
|
// to all the metrics before pushing them to URL.
|
||||||
|
ExtraLabels string
|
||||||
|
// WriteMetricsFn is a callback to write metrics to w in Prometheus text exposition format without timestamps and trailing comments.
|
||||||
|
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||||
|
WriteMetricsFn func(w io.Writer)
|
||||||
|
|
||||||
|
pushURL *url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate validates correctness of PushConfig fields
|
||||||
|
func (pc *PushConfig) Validate() error {
|
||||||
|
if pc.Interval <= 0 {
|
||||||
|
return fmt.Errorf("invalid Interval=%s: must be positive", pc.Interval)
|
||||||
|
}
|
||||||
|
if err := validateTags(pc.ExtraLabels); err != nil {
|
||||||
|
return fmt.Errorf("invalid ExtraLabels=%q: %w", pc.ExtraLabels, err)
|
||||||
|
}
|
||||||
|
pu, err := url.Parse(pc.URL)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot parse URL=%q: %w", pc.URL, err)
|
||||||
|
}
|
||||||
|
if pu.Scheme != "http" && pu.Scheme != "https" {
|
||||||
|
return fmt.Errorf("unsupported scheme in URL=%q; expecting 'http' or 'https'", pc.URL)
|
||||||
|
}
|
||||||
|
if pu.Host == "" {
|
||||||
|
return fmt.Errorf("missing host in URL=%q", pc.URL)
|
||||||
|
}
|
||||||
|
pc.pushURL = pu
|
||||||
|
return nil
|
||||||
|
}
|
33
config_test.go
Normal file
33
config_test.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPushConfigValidateError(t *testing.T) {
|
||||||
|
f := func(config *PushConfig) {
|
||||||
|
t.Helper()
|
||||||
|
if err := config.Validate(); err == nil {
|
||||||
|
t.Fatalf("expecting non-nil error when validating %v", config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f(&PushConfig{})
|
||||||
|
f(&PushConfig{URL: "", Interval: time.Second})
|
||||||
|
f(&PushConfig{URL: "https://localhost:8080", Interval: -1 * time.Second})
|
||||||
|
f(&PushConfig{URL: "htt://localhost:8080", Interval: time.Second})
|
||||||
|
f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second, ExtraLabels: "a{} "})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushConfigValidateSuccess(t *testing.T) {
|
||||||
|
f := func(config *PushConfig) {
|
||||||
|
t.Helper()
|
||||||
|
if err := config.Validate(); err != nil {
|
||||||
|
t.Fatalf("expecting nil error when validating %v; err: %s", config, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second})
|
||||||
|
f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second, ExtraLabels: `foo="bar"`})
|
||||||
|
}
|
59
push.go
59
push.go
@ -4,10 +4,8 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
@ -90,25 +88,45 @@ func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels strin
|
|||||||
// in this case metrics are pushed to all the provided pushURL urls.
|
// in this case metrics are pushed to all the provided pushURL urls.
|
||||||
//
|
//
|
||||||
// It is OK calling InitPushExt multiple times with different writeMetrics -
|
// It is OK calling InitPushExt multiple times with different writeMetrics -
|
||||||
// in this case all the metrics generated by writeMetrics callbacks are writte to pushURL.
|
// in this case all the metrics generated by writeMetrics callbacks are written to pushURL.
|
||||||
func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error {
|
func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error {
|
||||||
if interval <= 0 {
|
cfg := PushConfig{
|
||||||
return fmt.Errorf("interval must be positive; got %s", interval)
|
URL: pushURL,
|
||||||
|
Interval: interval,
|
||||||
|
ExtraLabels: extraLabels,
|
||||||
|
WriteMetricsFn: writeMetrics,
|
||||||
}
|
}
|
||||||
if err := validateTags(extraLabels); err != nil {
|
return InitPushWithConfig(cfg)
|
||||||
return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
|
|
||||||
}
|
}
|
||||||
pu, err := url.Parse(pushURL)
|
|
||||||
if err != nil {
|
// InitPushWithConfig sets up periodic push for metrics based on params from passed PushConfig.
|
||||||
return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err)
|
//
|
||||||
|
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||||
|
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||||
|
//
|
||||||
|
// It is OK calling InitPushWithConfig multiple times with different PushConfig.URL -
|
||||||
|
// in this case metrics are pushed to all the provided PushConfig.URL urls.
|
||||||
|
//
|
||||||
|
// It is OK calling InitPushWithConfig multiple times with different PushConfig.WriteMetricsFn -
|
||||||
|
// in this case all the metrics generated by PushConfig.WriteMetricsFn callbacks are written to PushConfig.URL.
|
||||||
|
//
|
||||||
|
// If
|
||||||
|
func InitPushWithConfig(cfg PushConfig) error {
|
||||||
|
if err := cfg.Validate(); err != nil {
|
||||||
|
return fmt.Errorf("failed to validate PushConfig: %w", err)
|
||||||
}
|
}
|
||||||
if pu.Scheme != "http" && pu.Scheme != "https" {
|
|
||||||
return fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL)
|
writeMetrics := cfg.WriteMetricsFn
|
||||||
|
if writeMetrics == nil {
|
||||||
|
writeMetrics = func(w io.Writer) {
|
||||||
|
WritePrometheus(w, true)
|
||||||
}
|
}
|
||||||
if pu.Host == "" {
|
|
||||||
return fmt.Errorf("missing host in pushURL=%q", pushURL)
|
|
||||||
}
|
}
|
||||||
pushURLRedacted := pu.Redacted()
|
|
||||||
|
pushURL := cfg.pushURL
|
||||||
|
pushURLRedacted := pushURL.Redacted()
|
||||||
|
|
||||||
|
interval := cfg.Interval
|
||||||
c := &http.Client{
|
c := &http.Client{
|
||||||
Timeout: interval,
|
Timeout: interval,
|
||||||
}
|
}
|
||||||
@ -118,6 +136,8 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
|||||||
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
|
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
|
||||||
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
|
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
|
||||||
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds())
|
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds())
|
||||||
|
|
||||||
|
extraLabels := cfg.ExtraLabels
|
||||||
go func() {
|
go func() {
|
||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
var bb bytes.Buffer
|
var bb bytes.Buffer
|
||||||
@ -146,12 +166,17 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
|||||||
blockLen := bb.Len()
|
blockLen := bb.Len()
|
||||||
bytesPushedTotal.Add(blockLen)
|
bytesPushedTotal.Add(blockLen)
|
||||||
pushBlockSize.Update(float64(blockLen))
|
pushBlockSize.Update(float64(blockLen))
|
||||||
req, err := http.NewRequest("GET", pushURL, &bb)
|
req, err := http.NewRequest("GET", pushURL.String(), &bb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
|
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "text/plain")
|
req.Header.Set("Content-Type", "text/plain")
|
||||||
req.Header.Set("Content-Encoding", "gzip")
|
req.Header.Set("Content-Encoding", "gzip")
|
||||||
|
for k, vs := range cfg.Headers {
|
||||||
|
for _, v := range vs {
|
||||||
|
req.Header.Add(k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
resp, err := c.Do(req)
|
resp, err := c.Do(req)
|
||||||
pushDuration.UpdateDuration(startTime)
|
pushDuration.UpdateDuration(startTime)
|
||||||
@ -161,7 +186,7 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if resp.StatusCode/100 != 2 {
|
if resp.StatusCode/100 != 2 {
|
||||||
body, _ := ioutil.ReadAll(resp.Body)
|
body, _ := io.ReadAll(resp.Body)
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q",
|
log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q",
|
||||||
pushURLRedacted, resp.StatusCode, body)
|
pushURLRedacted, resp.StatusCode, body)
|
||||||
|
73
push_example_test.go
Normal file
73
push_example_test.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
package metrics_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"compress/gzip"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ExampleInitPushWithConfig() {
|
||||||
|
syncCh := make(chan string)
|
||||||
|
srv := newServer(syncCh)
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
cfg := metrics.PushConfig{
|
||||||
|
URL: srv.URL,
|
||||||
|
Interval: time.Millisecond * 100,
|
||||||
|
WriteMetricsFn: func(w io.Writer) {
|
||||||
|
fmt.Fprint(w, "foo{label=\"bar\"} 1\n")
|
||||||
|
fmt.Fprint(w, "foo{label=\"baz\"} 2\n")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := metrics.InitPushWithConfig(cfg); err != nil {
|
||||||
|
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
|
||||||
|
}
|
||||||
|
fmt.Println(<-syncCh)
|
||||||
|
|
||||||
|
// Output:
|
||||||
|
// foo{label="bar"} 1
|
||||||
|
// foo{label="baz"} 2
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleInitPushExt() {
|
||||||
|
syncCh := make(chan string)
|
||||||
|
srv := newServer(syncCh)
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
writeFn := func(w io.Writer) {
|
||||||
|
fmt.Fprint(w, "foo{label=\"bar\"} 11\n")
|
||||||
|
fmt.Fprint(w, "foo{label=\"baz\"} 22\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := metrics.InitPushExt(srv.URL, time.Millisecond*100, "", writeFn)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
|
||||||
|
}
|
||||||
|
fmt.Println(<-syncCh)
|
||||||
|
|
||||||
|
// Output:
|
||||||
|
// foo{label="bar"} 11
|
||||||
|
// foo{label="baz"} 22
|
||||||
|
}
|
||||||
|
|
||||||
|
func newServer(ch chan string) *httptest.Server {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
gr, err := gzip.NewReader(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
|
||||||
|
}
|
||||||
|
defer gr.Close()
|
||||||
|
|
||||||
|
b, err := io.ReadAll(gr)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
|
||||||
|
}
|
||||||
|
ch <- string(b)
|
||||||
|
}))
|
||||||
|
return srv
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user