push.go: add InitPushWithOptions() function, which allows extending push options without breaking backwards compatibility and without the need to introduce new functions
Overrides https://github.com/VictoriaMetrics/metrics/pull/53 Overrides https://github.com/VictoriaMetrics/metrics/pull/55 Allows specifying authorization headers via PushOptions.Headers for https://github.com/VictoriaMetrics/metrics/issues/36 Allows disabling request body compression via PushOption.DisableCompression for https://github.com/VictoriaMetrics/metrics/pull/41
This commit is contained in:
parent
447d235cbb
commit
a9e3faa53c
150
push.go
150
push.go
@ -8,11 +8,49 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"compress/gzip"
|
||||
)
|
||||
|
||||
// PushOptions is the list of options, which may be applied to InitPushWithOptions().
|
||||
type PushOptions struct {
|
||||
// ExtraLabels is an optional comma-separated list of `label="value"` labels, which must be added to all the metrics before pushing them to pushURL.
|
||||
ExtraLabels string
|
||||
|
||||
// Headers is an optional list of HTTP headers to add to every push request to pushURL.
|
||||
//
|
||||
// Every item in the list must have the form `Header: value`. For example, `Authorization: Custom my-top-secret`.
|
||||
Headers []string
|
||||
|
||||
// Whether to disable HTTP request body compression before sending the metrics to pushURL.
|
||||
//
|
||||
// By default the compression is enabled.
|
||||
DisableCompression bool
|
||||
}
|
||||
|
||||
// InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval.
|
||||
//
|
||||
// If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL.
|
||||
//
|
||||
// opts may contain additional configuration options if non-nil.
|
||||
//
|
||||
// The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
//
|
||||
// It is OK calling InitPushWithOptions multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func InitPushWithOptions(pushURL string, interval time.Duration, pushProcessMetrics bool, opts *PushOptions) error {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
WritePrometheus(w, pushProcessMetrics)
|
||||
}
|
||||
return initPushWithOptions(pushURL, interval, writeMetrics, opts)
|
||||
}
|
||||
|
||||
// InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval.
|
||||
//
|
||||
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
|
||||
@ -38,7 +76,7 @@ func InitPushProcessMetrics(pushURL string, interval time.Duration, extraLabels
|
||||
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
|
||||
// to all the metrics before pushing them to pushURL.
|
||||
//
|
||||
// If pushProcessMetrics is set to true, then 'process_*' metrics are also pushed to pushURL.
|
||||
// If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL.
|
||||
//
|
||||
// The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
@ -55,12 +93,31 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr
|
||||
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
|
||||
}
|
||||
|
||||
// InitPushWithOptions sets up periodic push for metrics from s to the given pushURL with the given interval.
|
||||
//
|
||||
// opts may contain additional configuration options if non-nil.
|
||||
//
|
||||
// The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
//
|
||||
// It is OK calling InitPushWithOptions multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func (s *Set) InitPushWithOptions(pushURL string, interval time.Duration, opts *PushOptions) error {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
s.WritePrometheus(w)
|
||||
}
|
||||
return initPushWithOptions(pushURL, interval, writeMetrics, opts)
|
||||
}
|
||||
|
||||
// InitPush sets up periodic push for metrics from s to the given pushURL with the given interval.
|
||||
//
|
||||
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
|
||||
// to all the metrics before pushing them to pushURL.
|
||||
//
|
||||
// / The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// The metrics are pushed to pushURL in Prometheus text exposition format.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
@ -90,14 +147,16 @@ func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels strin
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
//
|
||||
// It is OK calling InitPushExt multiple times with different writeMetrics -
|
||||
// in this case all the metrics generated by writeMetrics callbacks are writte to pushURL.
|
||||
// in this case all the metrics generated by writeMetrics callbacks are written to pushURL.
|
||||
func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error {
|
||||
if interval <= 0 {
|
||||
return fmt.Errorf("interval must be positive; got %s", interval)
|
||||
}
|
||||
if err := validateTags(extraLabels); err != nil {
|
||||
return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
|
||||
opts := &PushOptions{
|
||||
ExtraLabels: extraLabels,
|
||||
}
|
||||
return initPushWithOptions(pushURL, interval, writeMetrics, opts)
|
||||
}
|
||||
|
||||
func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error {
|
||||
// validate pushURL
|
||||
pu, err := url.Parse(pushURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err)
|
||||
@ -108,16 +167,53 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
||||
if pu.Host == "" {
|
||||
return fmt.Errorf("missing host in pushURL=%q", pushURL)
|
||||
}
|
||||
pushURLRedacted := pu.Redacted()
|
||||
c := &http.Client{
|
||||
Timeout: interval,
|
||||
|
||||
// validate interval
|
||||
if interval <= 0 {
|
||||
return fmt.Errorf("interval must be positive; got %s", interval)
|
||||
}
|
||||
|
||||
// validate ExtraLabels
|
||||
var extraLabels string
|
||||
if opts != nil {
|
||||
extraLabels = opts.ExtraLabels
|
||||
}
|
||||
if err := validateTags(extraLabels); err != nil {
|
||||
return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
|
||||
}
|
||||
|
||||
// validate Headers
|
||||
var headers http.Header
|
||||
if opts != nil {
|
||||
for _, h := range opts.Headers {
|
||||
n := strings.IndexByte(h, ':')
|
||||
if n < 0 {
|
||||
return fmt.Errorf("missing `:` delimiter in the header %q", h)
|
||||
}
|
||||
name := strings.TrimSpace(h[:n])
|
||||
value := strings.TrimSpace(h[n+1:])
|
||||
headers.Add(name, value)
|
||||
}
|
||||
}
|
||||
|
||||
// validate DisableCompression
|
||||
disableCompression := false
|
||||
if opts != nil {
|
||||
disableCompression = opts.DisableCompression
|
||||
}
|
||||
|
||||
// Initialize metrics for the given pushURL
|
||||
pushURLRedacted := pu.Redacted()
|
||||
pushesTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted))
|
||||
pushErrorsTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted))
|
||||
bytesPushedTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted))
|
||||
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
|
||||
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
|
||||
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds())
|
||||
|
||||
c := &http.Client{
|
||||
Timeout: interval,
|
||||
}
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
var bb bytes.Buffer
|
||||
@ -133,14 +229,16 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
||||
panic(fmt.Errorf("BUG: cannot write %d bytes to bytes.Buffer: %s", len(tmpBuf), err))
|
||||
}
|
||||
}
|
||||
tmpBuf = append(tmpBuf[:0], bb.Bytes()...)
|
||||
bb.Reset()
|
||||
zw.Reset(&bb)
|
||||
if _, err := zw.Write(tmpBuf); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(tmpBuf), err))
|
||||
}
|
||||
if err := zw.Close(); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err))
|
||||
if !disableCompression {
|
||||
tmpBuf = append(tmpBuf[:0], bb.Bytes()...)
|
||||
bb.Reset()
|
||||
zw.Reset(&bb)
|
||||
if _, err := zw.Write(tmpBuf); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(tmpBuf), err))
|
||||
}
|
||||
if err := zw.Close(); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err))
|
||||
}
|
||||
}
|
||||
pushesTotal.Inc()
|
||||
blockLen := bb.Len()
|
||||
@ -150,8 +248,20 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
|
||||
}
|
||||
|
||||
// Set the needed headers
|
||||
for name, values := range headers {
|
||||
for _, value := range values {
|
||||
req.Header.Add(name, value)
|
||||
}
|
||||
}
|
||||
req.Header.Set("Content-Type", "text/plain")
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
|
||||
if !disableCompression {
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
}
|
||||
|
||||
// Perform the request
|
||||
startTime := time.Now()
|
||||
resp, err := c.Do(req)
|
||||
pushDuration.UpdateDuration(startTime)
|
||||
|
Loading…
x
Reference in New Issue
Block a user