diff --git a/push.go b/push.go index 4215f48..afd34d0 100644 --- a/push.go +++ b/push.go @@ -8,11 +8,49 @@ import ( "log" "net/http" "net/url" + "strings" "time" "compress/gzip" ) +// PushOptions is the list of options, which may be applied to InitPushWithOptions(). +type PushOptions struct { + // ExtraLabels is an optional comma-separated list of `label="value"` labels, which must be added to all the metrics before pushing them to pushURL. + ExtraLabels string + + // Headers is an optional list of HTTP headers to add to every push request to pushURL. + // + // Every item in the list must have the form `Header: value`. For example, `Authorization: Custom my-top-secret`. + Headers []string + + // Whether to disable HTTP request body compression before sending the metrics to pushURL. + // + // By default the compression is enabled. + DisableCompression bool +} + +// InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval. +// +// If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL. +// +// opts may contain additional configuration options if non-nil. +// +// The metrics are pushed to pushURL in Prometheus text exposition format. +// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +// +// It is OK calling InitPushWithOptions multiple times with different pushURL - +// in this case metrics are pushed to all the provided pushURL urls. +func InitPushWithOptions(pushURL string, interval time.Duration, pushProcessMetrics bool, opts *PushOptions) error { + writeMetrics := func(w io.Writer) { + WritePrometheus(w, pushProcessMetrics) + } + return initPushWithOptions(pushURL, interval, writeMetrics, opts) +} + // InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval. // // extraLabels may contain comma-separated list of `label="value"` labels, which will be added @@ -38,7 +76,7 @@ func InitPushProcessMetrics(pushURL string, interval time.Duration, extraLabels // extraLabels may contain comma-separated list of `label="value"` labels, which will be added // to all the metrics before pushing them to pushURL. // -// If pushProcessMetrics is set to true, then 'process_*' metrics are also pushed to pushURL. +// If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL. // // The metrics are pushed to pushURL in Prometheus text exposition format. // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format @@ -55,12 +93,31 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr return InitPushExt(pushURL, interval, extraLabels, writeMetrics) } +// InitPushWithOptions sets up periodic push for metrics from s to the given pushURL with the given interval. +// +// opts may contain additional configuration options if non-nil. +// +// The metrics are pushed to pushURL in Prometheus text exposition format. +// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format +// +// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to +// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format +// +// It is OK calling InitPushWithOptions multiple times with different pushURL - +// in this case metrics are pushed to all the provided pushURL urls. +func (s *Set) InitPushWithOptions(pushURL string, interval time.Duration, opts *PushOptions) error { + writeMetrics := func(w io.Writer) { + s.WritePrometheus(w) + } + return initPushWithOptions(pushURL, interval, writeMetrics, opts) +} + // InitPush sets up periodic push for metrics from s to the given pushURL with the given interval. // // extraLabels may contain comma-separated list of `label="value"` labels, which will be added // to all the metrics before pushing them to pushURL. // -// / The metrics are pushed to pushURL in Prometheus text exposition format. +// The metrics are pushed to pushURL in Prometheus text exposition format. // See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format // // It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to @@ -90,14 +147,16 @@ func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels strin // in this case metrics are pushed to all the provided pushURL urls. // // It is OK calling InitPushExt multiple times with different writeMetrics - -// in this case all the metrics generated by writeMetrics callbacks are writte to pushURL. +// in this case all the metrics generated by writeMetrics callbacks are written to pushURL. func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error { - if interval <= 0 { - return fmt.Errorf("interval must be positive; got %s", interval) - } - if err := validateTags(extraLabels); err != nil { - return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err) + opts := &PushOptions{ + ExtraLabels: extraLabels, } + return initPushWithOptions(pushURL, interval, writeMetrics, opts) +} + +func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error { + // validate pushURL pu, err := url.Parse(pushURL) if err != nil { return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err) @@ -108,16 +167,53 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri if pu.Host == "" { return fmt.Errorf("missing host in pushURL=%q", pushURL) } - pushURLRedacted := pu.Redacted() - c := &http.Client{ - Timeout: interval, + + // validate interval + if interval <= 0 { + return fmt.Errorf("interval must be positive; got %s", interval) } + + // validate ExtraLabels + var extraLabels string + if opts != nil { + extraLabels = opts.ExtraLabels + } + if err := validateTags(extraLabels); err != nil { + return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err) + } + + // validate Headers + var headers http.Header + if opts != nil { + for _, h := range opts.Headers { + n := strings.IndexByte(h, ':') + if n < 0 { + return fmt.Errorf("missing `:` delimiter in the header %q", h) + } + name := strings.TrimSpace(h[:n]) + value := strings.TrimSpace(h[n+1:]) + headers.Add(name, value) + } + } + + // validate DisableCompression + disableCompression := false + if opts != nil { + disableCompression = opts.DisableCompression + } + + // Initialize metrics for the given pushURL + pushURLRedacted := pu.Redacted() pushesTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted)) pushErrorsTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted)) bytesPushedTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted)) pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted)) pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted)) pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds()) + + c := &http.Client{ + Timeout: interval, + } go func() { ticker := time.NewTicker(interval) var bb bytes.Buffer @@ -133,14 +229,16 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri panic(fmt.Errorf("BUG: cannot write %d bytes to bytes.Buffer: %s", len(tmpBuf), err)) } } - tmpBuf = append(tmpBuf[:0], bb.Bytes()...) - bb.Reset() - zw.Reset(&bb) - if _, err := zw.Write(tmpBuf); err != nil { - panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(tmpBuf), err)) - } - if err := zw.Close(); err != nil { - panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err)) + if !disableCompression { + tmpBuf = append(tmpBuf[:0], bb.Bytes()...) + bb.Reset() + zw.Reset(&bb) + if _, err := zw.Write(tmpBuf); err != nil { + panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(tmpBuf), err)) + } + if err := zw.Close(); err != nil { + panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err)) + } } pushesTotal.Inc() blockLen := bb.Len() @@ -150,8 +248,20 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri if err != nil { panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err)) } + + // Set the needed headers + for name, values := range headers { + for _, value := range values { + req.Header.Add(name, value) + } + } req.Header.Set("Content-Type", "text/plain") - req.Header.Set("Content-Encoding", "gzip") + + if !disableCompression { + req.Header.Set("Content-Encoding", "gzip") + } + + // Perform the request startTime := time.Now() resp, err := c.Do(req) pushDuration.UpdateDuration(startTime)