From 42c28a8486fb3f180026c0b2c90643ac9e91951e Mon Sep 17 00:00:00 2001 From: Dmytro Kozlov Date: Fri, 15 Dec 2023 13:08:55 +0100 Subject: [PATCH] support push config for InitPush (#53) PushConfig represents a struct package uses for pushing metrics to remote destination. Having a structure helps to extend functionality in the future, without touching the signature of existing functions. For example, `PushConfig` supports custom HTTP headers via `Headers` param. Updates https://github.com/VictoriaMetrics/metrics/issues/52 Updates https://github.com/VictoriaMetrics/metrics/issues/36 Co-authored-by: hagen1778 --- README.md | 3 ++ config.go | 52 +++++++++++++++++++++++++++++++ config_test.go | 33 ++++++++++++++++++++ push.go | 63 ++++++++++++++++++++++++++------------ push_example_test.go | 73 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 205 insertions(+), 19 deletions(-) create mode 100644 config.go create mode 100644 config_test.go create mode 100644 push_example_test.go diff --git a/README.md b/README.md index e1a2537..7bb3885 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,9 @@ http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) { // ... or push registered metrics every 10 seconds to http://victoria-metrics:8428/api/v1/import/prometheus // with the added `instance="foobar"` label to all the pushed metrics. metrics.InitPush("http://victoria-metrics:8428/api/v1/import/prometheus", 10*time.Second, `instance="foobar"`, true) + +// ... or use metrics.PushConfig to see full list of configuration params +// metrics.InitPushWithConfig(metrics.PushConfig{URL: "http://victoria-metrics:8428/api/v1/import/prometheus", Interval: 10*time.Second}) ``` See [docs](http://godoc.org/github.com/VictoriaMetrics/metrics) for more info. diff --git a/config.go b/config.go new file mode 100644 index 0000000..5fc345a --- /dev/null +++ b/config.go @@ -0,0 +1,52 @@ +package metrics + +import ( + "fmt" + "io" + "net/http" + "net/url" + "time" +) + +// PushConfig is config for pushing registered metrics to the given URL with the given Interval. +// +// URL and Interval are required fields +type PushConfig struct { + // URL defines URL where metrics would be pushed. + URL string + // Interval determines the frequency of pushing metrics. + Interval time.Duration + + // Headers contain optional http request Headers + Headers http.Header + // ExtraLabels may contain comma-separated list of `label="value"` labels, which will be added + // to all the metrics before pushing them to URL. + ExtraLabels string + // WriteMetricsFn is a callback to write metrics to w in Prometheus text exposition format without timestamps and trailing comments. + // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format + WriteMetricsFn func(w io.Writer) + + pushURL *url.URL +} + +// Validate validates correctness of PushConfig fields +func (pc *PushConfig) Validate() error { + if pc.Interval <= 0 { + return fmt.Errorf("invalid Interval=%s: must be positive", pc.Interval) + } + if err := validateTags(pc.ExtraLabels); err != nil { + return fmt.Errorf("invalid ExtraLabels=%q: %w", pc.ExtraLabels, err) + } + pu, err := url.Parse(pc.URL) + if err != nil { + return fmt.Errorf("cannot parse URL=%q: %w", pc.URL, err) + } + if pu.Scheme != "http" && pu.Scheme != "https" { + return fmt.Errorf("unsupported scheme in URL=%q; expecting 'http' or 'https'", pc.URL) + } + if pu.Host == "" { + return fmt.Errorf("missing host in URL=%q", pc.URL) + } + pc.pushURL = pu + return nil +} diff --git a/config_test.go b/config_test.go new file mode 100644 index 0000000..b808d4a --- /dev/null +++ b/config_test.go @@ -0,0 +1,33 @@ +package metrics + +import ( + "testing" + "time" +) + +func TestPushConfigValidateError(t *testing.T) { + f := func(config *PushConfig) { + t.Helper() + if err := config.Validate(); err == nil { + t.Fatalf("expecting non-nil error when validating %v", config) + } + } + + f(&PushConfig{}) + f(&PushConfig{URL: "", Interval: time.Second}) + f(&PushConfig{URL: "https://localhost:8080", Interval: -1 * time.Second}) + f(&PushConfig{URL: "htt://localhost:8080", Interval: time.Second}) + f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second, ExtraLabels: "a{} "}) +} + +func TestPushConfigValidateSuccess(t *testing.T) { + f := func(config *PushConfig) { + t.Helper() + if err := config.Validate(); err != nil { + t.Fatalf("expecting nil error when validating %v; err: %s", config, err) + } + } + + f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second}) + f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second, ExtraLabels: `foo="bar"`}) +} diff --git a/push.go b/push.go index 4215f48..2d57765 100644 --- a/push.go +++ b/push.go @@ -4,10 +4,8 @@ import ( "bytes" "fmt" "io" - "io/ioutil" "log" "net/http" - "net/url" "time" "compress/gzip" @@ -90,25 +88,45 @@ func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels strin // in this case metrics are pushed to all the provided pushURL urls. // // It is OK calling InitPushExt multiple times with different writeMetrics - -// in this case all the metrics generated by writeMetrics callbacks are writte to pushURL. +// in this case all the metrics generated by writeMetrics callbacks are written to pushURL. func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error { - if interval <= 0 { - return fmt.Errorf("interval must be positive; got %s", interval) + cfg := PushConfig{ + URL: pushURL, + Interval: interval, + ExtraLabels: extraLabels, + WriteMetricsFn: writeMetrics, } - if err := validateTags(extraLabels); err != nil { - return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err) + return InitPushWithConfig(cfg) +} + +// InitPushWithConfig sets up periodic push for metrics based on params from passed PushConfig. +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +// +// It is OK calling InitPushWithConfig multiple times with different PushConfig.URL - +// in this case metrics are pushed to all the provided PushConfig.URL urls. +// +// It is OK calling InitPushWithConfig multiple times with different PushConfig.WriteMetricsFn - +// in this case all the metrics generated by PushConfig.WriteMetricsFn callbacks are written to PushConfig.URL. +// +// If +func InitPushWithConfig(cfg PushConfig) error { + if err := cfg.Validate(); err != nil { + return fmt.Errorf("failed to validate PushConfig: %w", err) } - pu, err := url.Parse(pushURL) - if err != nil { - return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err) + + writeMetrics := cfg.WriteMetricsFn + if writeMetrics == nil { + writeMetrics = func(w io.Writer) { + WritePrometheus(w, true) + } } - if pu.Scheme != "http" && pu.Scheme != "https" { - return fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL) - } - if pu.Host == "" { - return fmt.Errorf("missing host in pushURL=%q", pushURL) - } - pushURLRedacted := pu.Redacted() + + pushURL := cfg.pushURL + pushURLRedacted := pushURL.Redacted() + + interval := cfg.Interval c := &http.Client{ Timeout: interval, } @@ -118,6 +136,8 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted)) pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted)) pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds()) + + extraLabels := cfg.ExtraLabels go func() { ticker := time.NewTicker(interval) var bb bytes.Buffer @@ -146,12 +166,17 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri blockLen := bb.Len() bytesPushedTotal.Add(blockLen) pushBlockSize.Update(float64(blockLen)) - req, err := http.NewRequest("GET", pushURL, &bb) + req, err := http.NewRequest("GET", pushURL.String(), &bb) if err != nil { panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err)) } req.Header.Set("Content-Type", "text/plain") req.Header.Set("Content-Encoding", "gzip") + for k, vs := range cfg.Headers { + for _, v := range vs { + req.Header.Add(k, v) + } + } startTime := time.Now() resp, err := c.Do(req) pushDuration.UpdateDuration(startTime) @@ -161,7 +186,7 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri continue } if resp.StatusCode/100 != 2 { - body, _ := ioutil.ReadAll(resp.Body) + body, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q", pushURLRedacted, resp.StatusCode, body) diff --git a/push_example_test.go b/push_example_test.go new file mode 100644 index 0000000..590aff9 --- /dev/null +++ b/push_example_test.go @@ -0,0 +1,73 @@ +package metrics_test + +import ( + "compress/gzip" + "fmt" + "io" + "net/http" + "net/http/httptest" + "time" + + "github.com/VictoriaMetrics/metrics" +) + +func ExampleInitPushWithConfig() { + syncCh := make(chan string) + srv := newServer(syncCh) + defer srv.Close() + + cfg := metrics.PushConfig{ + URL: srv.URL, + Interval: time.Millisecond * 100, + WriteMetricsFn: func(w io.Writer) { + fmt.Fprint(w, "foo{label=\"bar\"} 1\n") + fmt.Fprint(w, "foo{label=\"baz\"} 2\n") + }, + } + if err := metrics.InitPushWithConfig(cfg); err != nil { + panic(fmt.Sprintf("BUG: unexpected error: %s", err)) + } + fmt.Println(<-syncCh) + + // Output: + // foo{label="bar"} 1 + // foo{label="baz"} 2 +} + +func ExampleInitPushExt() { + syncCh := make(chan string) + srv := newServer(syncCh) + defer srv.Close() + + writeFn := func(w io.Writer) { + fmt.Fprint(w, "foo{label=\"bar\"} 11\n") + fmt.Fprint(w, "foo{label=\"baz\"} 22\n") + } + + err := metrics.InitPushExt(srv.URL, time.Millisecond*100, "", writeFn) + if err != nil { + panic(fmt.Sprintf("BUG: unexpected error: %s", err)) + } + fmt.Println(<-syncCh) + + // Output: + // foo{label="bar"} 11 + // foo{label="baz"} 22 +} + +func newServer(ch chan string) *httptest.Server { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gr, err := gzip.NewReader(r.Body) + if err != nil { + panic(fmt.Sprintf("BUG: unexpected error: %s", err)) + } + defer gr.Close() + + b, err := io.ReadAll(gr) + if err != nil { + panic(fmt.Sprintf("BUG: unexpected error: %s", err)) + } + ch <- string(b) + })) + return srv +}