Revert "support push config for InitPush (#53)"

This reverts commit 42c28a8486.

Reason for revert: the provided additional public API looks not very good for future support.
See https://github.com/VictoriaMetrics/metrics/pull/53#pullrequestreview-1785442537 for details.
This commit is contained in:
Aliaksandr Valialkin 2023-12-17 15:39:32 +02:00
parent 42c28a8486
commit cd448dd4c1
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
5 changed files with 19 additions and 205 deletions

View File

@ -71,9 +71,6 @@ http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
// ... or push registered metrics every 10 seconds to http://victoria-metrics:8428/api/v1/import/prometheus // ... or push registered metrics every 10 seconds to http://victoria-metrics:8428/api/v1/import/prometheus
// with the added `instance="foobar"` label to all the pushed metrics. // with the added `instance="foobar"` label to all the pushed metrics.
metrics.InitPush("http://victoria-metrics:8428/api/v1/import/prometheus", 10*time.Second, `instance="foobar"`, true) metrics.InitPush("http://victoria-metrics:8428/api/v1/import/prometheus", 10*time.Second, `instance="foobar"`, true)
// ... or use metrics.PushConfig to see full list of configuration params
// metrics.InitPushWithConfig(metrics.PushConfig{URL: "http://victoria-metrics:8428/api/v1/import/prometheus", Interval: 10*time.Second})
``` ```
See [docs](http://godoc.org/github.com/VictoriaMetrics/metrics) for more info. See [docs](http://godoc.org/github.com/VictoriaMetrics/metrics) for more info.

View File

@ -1,52 +0,0 @@
package metrics
import (
"fmt"
"io"
"net/http"
"net/url"
"time"
)
// PushConfig is config for pushing registered metrics to the given URL with the given Interval.
//
// URL and Interval are required fields
type PushConfig struct {
// URL defines URL where metrics would be pushed.
URL string
// Interval determines the frequency of pushing metrics.
Interval time.Duration
// Headers contain optional http request Headers
Headers http.Header
// ExtraLabels may contain comma-separated list of `label="value"` labels, which will be added
// to all the metrics before pushing them to URL.
ExtraLabels string
// WriteMetricsFn is a callback to write metrics to w in Prometheus text exposition format without timestamps and trailing comments.
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
WriteMetricsFn func(w io.Writer)
pushURL *url.URL
}
// Validate validates correctness of PushConfig fields
func (pc *PushConfig) Validate() error {
if pc.Interval <= 0 {
return fmt.Errorf("invalid Interval=%s: must be positive", pc.Interval)
}
if err := validateTags(pc.ExtraLabels); err != nil {
return fmt.Errorf("invalid ExtraLabels=%q: %w", pc.ExtraLabels, err)
}
pu, err := url.Parse(pc.URL)
if err != nil {
return fmt.Errorf("cannot parse URL=%q: %w", pc.URL, err)
}
if pu.Scheme != "http" && pu.Scheme != "https" {
return fmt.Errorf("unsupported scheme in URL=%q; expecting 'http' or 'https'", pc.URL)
}
if pu.Host == "" {
return fmt.Errorf("missing host in URL=%q", pc.URL)
}
pc.pushURL = pu
return nil
}

View File

@ -1,33 +0,0 @@
package metrics
import (
"testing"
"time"
)
func TestPushConfigValidateError(t *testing.T) {
f := func(config *PushConfig) {
t.Helper()
if err := config.Validate(); err == nil {
t.Fatalf("expecting non-nil error when validating %v", config)
}
}
f(&PushConfig{})
f(&PushConfig{URL: "", Interval: time.Second})
f(&PushConfig{URL: "https://localhost:8080", Interval: -1 * time.Second})
f(&PushConfig{URL: "htt://localhost:8080", Interval: time.Second})
f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second, ExtraLabels: "a{} "})
}
func TestPushConfigValidateSuccess(t *testing.T) {
f := func(config *PushConfig) {
t.Helper()
if err := config.Validate(); err != nil {
t.Fatalf("expecting nil error when validating %v; err: %s", config, err)
}
}
f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second})
f(&PushConfig{URL: "http://localhost:8080", Interval: time.Second, ExtraLabels: `foo="bar"`})
}

61
push.go
View File

@ -4,8 +4,10 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"net/http" "net/http"
"net/url"
"time" "time"
"compress/gzip" "compress/gzip"
@ -88,45 +90,25 @@ func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels strin
// in this case metrics are pushed to all the provided pushURL urls. // in this case metrics are pushed to all the provided pushURL urls.
// //
// It is OK calling InitPushExt multiple times with different writeMetrics - // It is OK calling InitPushExt multiple times with different writeMetrics -
// in this case all the metrics generated by writeMetrics callbacks are written to pushURL. // in this case all the metrics generated by writeMetrics callbacks are writte to pushURL.
func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error { func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error {
cfg := PushConfig{ if interval <= 0 {
URL: pushURL, return fmt.Errorf("interval must be positive; got %s", interval)
Interval: interval,
ExtraLabels: extraLabels,
WriteMetricsFn: writeMetrics,
} }
return InitPushWithConfig(cfg) if err := validateTags(extraLabels); err != nil {
} return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
// InitPushWithConfig sets up periodic push for metrics based on params from passed PushConfig.
//
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
//
// It is OK calling InitPushWithConfig multiple times with different PushConfig.URL -
// in this case metrics are pushed to all the provided PushConfig.URL urls.
//
// It is OK calling InitPushWithConfig multiple times with different PushConfig.WriteMetricsFn -
// in this case all the metrics generated by PushConfig.WriteMetricsFn callbacks are written to PushConfig.URL.
//
// If
func InitPushWithConfig(cfg PushConfig) error {
if err := cfg.Validate(); err != nil {
return fmt.Errorf("failed to validate PushConfig: %w", err)
} }
pu, err := url.Parse(pushURL)
writeMetrics := cfg.WriteMetricsFn if err != nil {
if writeMetrics == nil { return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err)
writeMetrics = func(w io.Writer) {
WritePrometheus(w, true)
} }
if pu.Scheme != "http" && pu.Scheme != "https" {
return fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL)
} }
if pu.Host == "" {
pushURL := cfg.pushURL return fmt.Errorf("missing host in pushURL=%q", pushURL)
pushURLRedacted := pushURL.Redacted() }
pushURLRedacted := pu.Redacted()
interval := cfg.Interval
c := &http.Client{ c := &http.Client{
Timeout: interval, Timeout: interval,
} }
@ -136,8 +118,6 @@ func InitPushWithConfig(cfg PushConfig) error {
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted)) pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted)) pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds()) pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds())
extraLabels := cfg.ExtraLabels
go func() { go func() {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
var bb bytes.Buffer var bb bytes.Buffer
@ -166,17 +146,12 @@ func InitPushWithConfig(cfg PushConfig) error {
blockLen := bb.Len() blockLen := bb.Len()
bytesPushedTotal.Add(blockLen) bytesPushedTotal.Add(blockLen)
pushBlockSize.Update(float64(blockLen)) pushBlockSize.Update(float64(blockLen))
req, err := http.NewRequest("GET", pushURL.String(), &bb) req, err := http.NewRequest("GET", pushURL, &bb)
if err != nil { if err != nil {
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err)) panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
} }
req.Header.Set("Content-Type", "text/plain") req.Header.Set("Content-Type", "text/plain")
req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Encoding", "gzip")
for k, vs := range cfg.Headers {
for _, v := range vs {
req.Header.Add(k, v)
}
}
startTime := time.Now() startTime := time.Now()
resp, err := c.Do(req) resp, err := c.Do(req)
pushDuration.UpdateDuration(startTime) pushDuration.UpdateDuration(startTime)
@ -186,7 +161,7 @@ func InitPushWithConfig(cfg PushConfig) error {
continue continue
} }
if resp.StatusCode/100 != 2 { if resp.StatusCode/100 != 2 {
body, _ := io.ReadAll(resp.Body) body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close() _ = resp.Body.Close()
log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q", log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q",
pushURLRedacted, resp.StatusCode, body) pushURLRedacted, resp.StatusCode, body)

View File

@ -1,73 +0,0 @@
package metrics_test
import (
"compress/gzip"
"fmt"
"io"
"net/http"
"net/http/httptest"
"time"
"github.com/VictoriaMetrics/metrics"
)
func ExampleInitPushWithConfig() {
syncCh := make(chan string)
srv := newServer(syncCh)
defer srv.Close()
cfg := metrics.PushConfig{
URL: srv.URL,
Interval: time.Millisecond * 100,
WriteMetricsFn: func(w io.Writer) {
fmt.Fprint(w, "foo{label=\"bar\"} 1\n")
fmt.Fprint(w, "foo{label=\"baz\"} 2\n")
},
}
if err := metrics.InitPushWithConfig(cfg); err != nil {
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
}
fmt.Println(<-syncCh)
// Output:
// foo{label="bar"} 1
// foo{label="baz"} 2
}
func ExampleInitPushExt() {
syncCh := make(chan string)
srv := newServer(syncCh)
defer srv.Close()
writeFn := func(w io.Writer) {
fmt.Fprint(w, "foo{label=\"bar\"} 11\n")
fmt.Fprint(w, "foo{label=\"baz\"} 22\n")
}
err := metrics.InitPushExt(srv.URL, time.Millisecond*100, "", writeFn)
if err != nil {
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
}
fmt.Println(<-syncCh)
// Output:
// foo{label="bar"} 11
// foo{label="baz"} 22
}
func newServer(ch chan string) *httptest.Server {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gr, err := gzip.NewReader(r.Body)
if err != nil {
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
}
defer gr.Close()
b, err := io.ReadAll(gr)
if err != nil {
panic(fmt.Sprintf("BUG: unexpected error: %s", err))
}
ch <- string(b)
}))
return srv
}