From fd258897112d01ac9a2b7925a50d52c3045a4e49 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 19 Dec 2023 00:18:38 +0200 Subject: [PATCH] push.go: added PushMetrics() function for force pushing of existing metrics to the specified url Thanks to @aliaaaaaaaaa for the initial implementation at https://github.com/VictoriaMetrics/metrics/pull/37 --- push.go | 339 ++++++++++++++++++++++++++++++++++----------------- push_test.go | 84 +++++++++++++ 2 files changed, 312 insertions(+), 111 deletions(-) diff --git a/push.go b/push.go index 17990a9..c62c823 100644 --- a/push.go +++ b/push.go @@ -11,6 +11,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "compress/gzip" @@ -69,10 +70,7 @@ func InitPushWithOptions(ctx context.Context, pushURL string, interval time.Dura // It is OK calling InitPushProcessMetrics multiple times with different pushURL - // in this case metrics are pushed to all the provided pushURL urls. func InitPushProcessMetrics(pushURL string, interval time.Duration, extraLabels string) error { - writeMetrics := func(w io.Writer) { - WriteProcessMetrics(w) - } - return InitPushExt(pushURL, interval, extraLabels, writeMetrics) + return InitPushExt(pushURL, interval, extraLabels, WriteProcessMetrics) } // InitPush sets up periodic push for globally registered metrics to the given pushURL with the given interval. @@ -97,6 +95,21 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr return InitPushExt(pushURL, interval, extraLabels, writeMetrics) } +// PushMetrics pushes globally registered metrics to pushURL. +// +// If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL. +// +// opts may contain additional configuration options if non-nil. +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +func PushMetrics(ctx context.Context, pushURL string, pushProcessMetrics bool, opts *PushOptions) error { + writeMetrics := func(w io.Writer) { + WritePrometheus(w, pushProcessMetrics) + } + return PushMetricsExt(ctx, pushURL, writeMetrics, opts) +} + // InitPushWithOptions sets up periodic push for metrics from s to the given pushURL with the given interval. // // The periodic push is stopped when the ctx is canceled. @@ -112,10 +125,7 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr // It is OK calling InitPushWithOptions multiple times with different pushURL - // in this case metrics are pushed to all the provided pushURL urls. func (s *Set) InitPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, opts *PushOptions) error { - writeMetrics := func(w io.Writer) { - s.WritePrometheus(w) - } - return InitPushExtWithOptions(ctx, pushURL, interval, writeMetrics, opts) + return InitPushExtWithOptions(ctx, pushURL, interval, s.WritePrometheus, opts) } // InitPush sets up periodic push for metrics from s to the given pushURL with the given interval. @@ -132,10 +142,17 @@ func (s *Set) InitPushWithOptions(ctx context.Context, pushURL string, interval // It is OK calling InitPush multiple times with different pushURL - // in this case metrics are pushed to all the provided pushURL urls. func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels string) error { - writeMetrics := func(w io.Writer) { - s.WritePrometheus(w) - } - return InitPushExt(pushURL, interval, extraLabels, writeMetrics) + return InitPushExt(pushURL, interval, extraLabels, s.WritePrometheus) +} + +// PushMetrics pushes s metrics to pushURL. +// +// opts may contain additional configuration options if non-nil. +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +func (s *Set) PushMetrics(ctx context.Context, pushURL string, opts *PushOptions) error { + return PushMetricsExt(ctx, pushURL, s.WritePrometheus, opts) } // InitPushExt sets up periodic push for metrics obtained by calling writeMetrics with the given interval. @@ -179,6 +196,73 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri // It is OK calling InitPushExtWithOptions multiple times with different writeMetrics - // in this case all the metrics generated by writeMetrics callbacks are written to pushURL. func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error { + pc, err := newPushContext(pushURL, opts) + if err != nil { + return err + } + + // validate interval + if interval <= 0 { + return fmt.Errorf("interval must be positive; got %s", interval) + } + pushMetricsSet.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pc.pushURLRedacted)).Set(interval.Seconds()) + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + stopCh := ctx.Done() + for { + select { + case <-ticker.C: + ctxLocal, cancel := context.WithTimeout(ctx, interval+time.Second) + err := pc.pushMetrics(ctxLocal, writeMetrics) + cancel() + if err != nil { + log.Printf("ERROR: metrics.push: %s", err) + } + case <-stopCh: + return + } + } + }() + + return nil +} + +// PushMetricsExt pushes metrics generated by wirteMetrics to pushURL. +// +// The writeMetrics callback must write metrics to w in Prometheus text exposition format without timestamps and trailing comments. +// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format +// +// opts may contain additional configuration options if non-nil. +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +func PushMetricsExt(ctx context.Context, pushURL string, writeMetrics func(w io.Writer), opts *PushOptions) error { + pc, err := newPushContext(pushURL, opts) + if err != nil { + return err + } + return pc.pushMetrics(ctx, writeMetrics) +} + +type pushContext struct { + pushURL *url.URL + pushURLRedacted string + extraLabels string + headers http.Header + disableCompression bool + + client *http.Client + + pushesTotal *Counter + bytesPushedTotal *Counter + pushBlockSize *Histogram + pushDuration *Histogram + pushErrors *Counter +} + +func newPushContext(pushURL string, opts *PushOptions) (*pushContext, error) { if opts == nil { opts = &PushOptions{} } @@ -186,24 +270,19 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D // validate pushURL pu, err := url.Parse(pushURL) if err != nil { - return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err) + return nil, fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err) } if pu.Scheme != "http" && pu.Scheme != "https" { - return fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL) + return nil, fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL) } if pu.Host == "" { - return fmt.Errorf("missing host in pushURL=%q", pushURL) - } - - // validate interval - if interval <= 0 { - return fmt.Errorf("interval must be positive; got %s", interval) + return nil, fmt.Errorf("missing host in pushURL=%q", pushURL) } // validate ExtraLabels extraLabels := opts.ExtraLabels if err := validateTags(extraLabels); err != nil { - return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err) + return nil, fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err) } // validate Headers @@ -211,111 +290,108 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D for _, h := range opts.Headers { n := strings.IndexByte(h, ':') if n < 0 { - return fmt.Errorf("missing `:` delimiter in the header %q", h) + return nil, fmt.Errorf("missing `:` delimiter in the header %q", h) } name := strings.TrimSpace(h[:n]) value := strings.TrimSpace(h[n+1:]) headers.Add(name, value) } - // validate DisableCompression - disableCompression := opts.DisableCompression - - // Initialize metrics for the given pushURL pushURLRedacted := pu.Redacted() - pushesTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted)) - pushErrorsTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted)) - bytesPushedTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted)) - pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted)) - pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted)) - pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds()) + client := &http.Client{} + return &pushContext{ + pushURL: pu, + pushURLRedacted: pushURLRedacted, + extraLabels: extraLabels, + headers: headers, + disableCompression: opts.DisableCompression, - c := &http.Client{ - Timeout: interval, + client: client, + + pushesTotal: pushMetricsSet.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted)), + bytesPushedTotal: pushMetricsSet.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted)), + pushBlockSize: pushMetricsSet.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted)), + pushDuration: pushMetricsSet.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted)), + pushErrors: pushMetricsSet.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted)), + }, nil +} + +func (pc *pushContext) pushMetrics(ctx context.Context, writeMetrics func(w io.Writer)) error { + bb := getBytesBuffer() + defer putBytesBuffer(bb) + + writeMetrics(bb) + + if len(pc.extraLabels) > 0 { + bbTmp := getBytesBuffer() + bbTmp.B = append(bbTmp.B[:0], bb.B...) + bb.B = addExtraLabels(bb.B[:0], bbTmp.B, pc.extraLabels) + putBytesBuffer(bbTmp) } - go func() { - ticker := time.NewTicker(interval) - var bb bytes.Buffer - var tmpBuf []byte - zw := gzip.NewWriter(&bb) - stopCh := ctx.Done() - for { - select { - case <-ticker.C: - case <-stopCh: - return - } - - bb.Reset() - writeMetrics(&bb) - if len(extraLabels) > 0 { - tmpBuf = addExtraLabels(tmpBuf[:0], bb.Bytes(), extraLabels) - bb.Reset() - if _, err := bb.Write(tmpBuf); err != nil { - panic(fmt.Errorf("BUG: cannot write %d bytes to bytes.Buffer: %s", len(tmpBuf), err)) - } - } - if !disableCompression { - tmpBuf = append(tmpBuf[:0], bb.Bytes()...) - bb.Reset() - zw.Reset(&bb) - if _, err := zw.Write(tmpBuf); err != nil { - panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(tmpBuf), err)) - } - if err := zw.Close(); err != nil { - panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err)) - } - } - pushesTotal.Inc() - blockLen := bb.Len() - bytesPushedTotal.Add(blockLen) - pushBlockSize.Update(float64(blockLen)) - req, err := http.NewRequestWithContext(ctx, "GET", pushURL, &bb) - if err != nil { - panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err)) - } - - // Set the needed headers - for name, values := range headers { - for _, value := range values { - req.Header.Add(name, value) - } - } - req.Header.Set("Content-Type", "text/plain") - - if !disableCompression { - req.Header.Set("Content-Encoding", "gzip") - } - - // Perform the request - startTime := time.Now() - resp, err := c.Do(req) - pushDuration.UpdateDuration(startTime) - if err != nil { - if !errors.Is(err, context.Canceled) { - log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err) - pushErrorsTotal.Inc() - } - continue - } - if resp.StatusCode/100 != 2 { - body, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q", - pushURLRedacted, resp.StatusCode, body) - pushErrorsTotal.Inc() - continue - } - _ = resp.Body.Close() + if !pc.disableCompression { + bbTmp := getBytesBuffer() + bbTmp.B = append(bbTmp.B[:0], bb.B...) + bb.B = bb.B[:0] + zw := getGzipWriter(bb) + if _, err := zw.Write(bbTmp.B); err != nil { + panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(bbTmp.B), err)) } - }() + if err := zw.Close(); err != nil { + panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err)) + } + putGzipWriter(zw) + putBytesBuffer(bbTmp) + } + + // Update metrics + pc.pushesTotal.Inc() + blockLen := len(bb.B) + pc.bytesPushedTotal.Add(blockLen) + pc.pushBlockSize.Update(float64(blockLen)) + + // Prepare the request to sent to pc.pushURL + reqBody := bytes.NewReader(bb.B) + req, err := http.NewRequestWithContext(ctx, "GET", pc.pushURL.String(), reqBody) + if err != nil { + panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pc.pushURLRedacted, err)) + } + + // Set the needed headers + for name, values := range pc.headers { + for _, value := range values { + req.Header.Add(name, value) + } + } + req.Header.Set("Content-Type", "text/plain") + if !pc.disableCompression { + req.Header.Set("Content-Encoding", "gzip") + } + + // Perform the request + startTime := time.Now() + resp, err := pc.client.Do(req) + pc.pushDuration.UpdateDuration(startTime) + if err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + pc.pushErrors.Inc() + return fmt.Errorf("cannot push metrics to %q: %s", pc.pushURLRedacted, err) + } + if resp.StatusCode/100 != 2 { + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + pc.pushErrors.Inc() + return fmt.Errorf("unexpected status code in response from %q: %d; expecting 2xx; response body: %q", pc.pushURLRedacted, resp.StatusCode, body) + } + _ = resp.Body.Close() return nil } -var pushMetrics = NewSet() +var pushMetricsSet = NewSet() func writePushMetrics(w io.Writer) { - pushMetrics.WritePrometheus(w) + pushMetricsSet.WritePrometheus(w) } func addExtraLabels(dst, src []byte, extraLabels string) []byte { @@ -363,3 +439,44 @@ func addExtraLabels(dst, src []byte, extraLabels string) []byte { } var bashBytes = []byte("#") + +func getBytesBuffer() *bytesBuffer { + v := bytesBufferPool.Get() + if v == nil { + return &bytesBuffer{} + } + return v.(*bytesBuffer) +} + +func putBytesBuffer(bb *bytesBuffer) { + bb.B = bb.B[:0] + bytesBufferPool.Put(bb) +} + +var bytesBufferPool sync.Pool + +type bytesBuffer struct { + B []byte +} + +func (bb *bytesBuffer) Write(p []byte) (int, error) { + bb.B = append(bb.B, p...) + return len(p), nil +} + +func getGzipWriter(w io.Writer) *gzip.Writer { + v := gzipWriterPool.Get() + if v == nil { + return gzip.NewWriter(w) + } + zw := v.(*gzip.Writer) + zw.Reset(w) + return zw +} + +func putGzipWriter(zw *gzip.Writer) { + zw.Reset(io.Discard) + gzipWriterPool.Put(zw) +} + +var gzipWriterPool sync.Pool diff --git a/push_test.go b/push_test.go index 9048b22..dd5376a 100644 --- a/push_test.go +++ b/push_test.go @@ -151,3 +151,87 @@ func TestInitPushWithOptions(t *testing.T) { Headers: []string{"Foo: Bar", "baz:aaaa-bbb"}, }, "Baz: aaaa-bbb\r\nContent-Encoding: gzip\r\nContent-Type: text/plain\r\nFoo: Bar\r\n", "bar 42.12\nfoo 1234\n") } + +func TestPushMetrics(t *testing.T) { + f := func(s *Set, opts *PushOptions, expectedHeaders, expectedData string) { + t.Helper() + + var reqHeaders []byte + var reqData []byte + var reqErr error + doneCh := make(chan struct{}) + firstRequest := true + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if firstRequest { + var bb bytes.Buffer + r.Header.WriteSubset(&bb, map[string]bool{ + "Accept-Encoding": true, + "Content-Length": true, + "User-Agent": true, + }) + reqHeaders = bb.Bytes() + reqData, reqErr = io.ReadAll(r.Body) + close(doneCh) + firstRequest = false + } + })) + defer srv.Close() + ctx := context.Background() + if err := s.PushMetrics(ctx, srv.URL, opts); err != nil { + t.Fatalf("unexpected error: %s", err) + } + select { + case <-time.After(5 * time.Second): + t.Fatalf("timeout!") + case <-doneCh: + } + if reqErr != nil { + t.Fatalf("unexpected error: %s", reqErr) + } + if opts == nil || !opts.DisableCompression { + zr, err := gzip.NewReader(bytes.NewBuffer(reqData)) + if err != nil { + t.Fatalf("cannot initialize gzip reader: %s", err) + } + data, err := io.ReadAll(zr) + if err != nil { + t.Fatalf("cannot read data from gzip reader: %s", err) + } + if err := zr.Close(); err != nil { + t.Fatalf("unexpected error when closing gzip reader: %s", err) + } + reqData = data + } + if string(reqHeaders) != expectedHeaders { + t.Fatalf("unexpected request headers; got\n%s\nwant\n%s", reqHeaders, expectedHeaders) + } + if string(reqData) != expectedData { + t.Fatalf("unexpected data; got\n%s\nwant\n%s", reqData, expectedData) + } + } + + s := NewSet() + c := s.NewCounter("foo") + c.Set(1234) + _ = s.NewGauge("bar", func() float64 { + return 42.12 + }) + + // nil PushOptions + f(s, nil, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n") + + // Disable compression on the pushed request body + f(s, &PushOptions{ + DisableCompression: true, + }, "Content-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n") + + // Add extra labels + f(s, &PushOptions{ + ExtraLabels: `label1="value1",label2="value2"`, + }, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", `bar{label1="value1",label2="value2"} 42.12`+"\n"+`foo{label1="value1",label2="value2"} 1234`+"\n") + + // Add extra headers + f(s, &PushOptions{ + Headers: []string{"Foo: Bar", "baz:aaaa-bbb"}, + }, "Baz: aaaa-bbb\r\nContent-Encoding: gzip\r\nContent-Type: text/plain\r\nFoo: Bar\r\n", "bar 42.12\nfoo 1234\n") +}