push.go: added PushMetrics() function for force pushing of existing metrics to the specified url
Thanks to @aliaaaaaaaaa for the initial implementation at https://github.com/VictoriaMetrics/metrics/pull/37
This commit is contained in:
parent
bee9e4faf1
commit
fd25889711
339
push.go
339
push.go
@ -11,6 +11,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"compress/gzip"
|
||||
@ -69,10 +70,7 @@ func InitPushWithOptions(ctx context.Context, pushURL string, interval time.Dura
|
||||
// It is OK calling InitPushProcessMetrics multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func InitPushProcessMetrics(pushURL string, interval time.Duration, extraLabels string) error {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
WriteProcessMetrics(w)
|
||||
}
|
||||
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
|
||||
return InitPushExt(pushURL, interval, extraLabels, WriteProcessMetrics)
|
||||
}
|
||||
|
||||
// InitPush sets up periodic push for globally registered metrics to the given pushURL with the given interval.
|
||||
@ -97,6 +95,21 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr
|
||||
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
|
||||
}
|
||||
|
||||
// PushMetrics pushes globally registered metrics to pushURL.
|
||||
//
|
||||
// If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL.
|
||||
//
|
||||
// opts may contain additional configuration options if non-nil.
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
func PushMetrics(ctx context.Context, pushURL string, pushProcessMetrics bool, opts *PushOptions) error {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
WritePrometheus(w, pushProcessMetrics)
|
||||
}
|
||||
return PushMetricsExt(ctx, pushURL, writeMetrics, opts)
|
||||
}
|
||||
|
||||
// InitPushWithOptions sets up periodic push for metrics from s to the given pushURL with the given interval.
|
||||
//
|
||||
// The periodic push is stopped when the ctx is canceled.
|
||||
@ -112,10 +125,7 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr
|
||||
// It is OK calling InitPushWithOptions multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func (s *Set) InitPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, opts *PushOptions) error {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
s.WritePrometheus(w)
|
||||
}
|
||||
return InitPushExtWithOptions(ctx, pushURL, interval, writeMetrics, opts)
|
||||
return InitPushExtWithOptions(ctx, pushURL, interval, s.WritePrometheus, opts)
|
||||
}
|
||||
|
||||
// InitPush sets up periodic push for metrics from s to the given pushURL with the given interval.
|
||||
@ -132,10 +142,17 @@ func (s *Set) InitPushWithOptions(ctx context.Context, pushURL string, interval
|
||||
// It is OK calling InitPush multiple times with different pushURL -
|
||||
// in this case metrics are pushed to all the provided pushURL urls.
|
||||
func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels string) error {
|
||||
writeMetrics := func(w io.Writer) {
|
||||
s.WritePrometheus(w)
|
||||
}
|
||||
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
|
||||
return InitPushExt(pushURL, interval, extraLabels, s.WritePrometheus)
|
||||
}
|
||||
|
||||
// PushMetrics pushes s metrics to pushURL.
|
||||
//
|
||||
// opts may contain additional configuration options if non-nil.
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
func (s *Set) PushMetrics(ctx context.Context, pushURL string, opts *PushOptions) error {
|
||||
return PushMetricsExt(ctx, pushURL, s.WritePrometheus, opts)
|
||||
}
|
||||
|
||||
// InitPushExt sets up periodic push for metrics obtained by calling writeMetrics with the given interval.
|
||||
@ -179,6 +196,73 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
||||
// It is OK calling InitPushExtWithOptions multiple times with different writeMetrics -
|
||||
// in this case all the metrics generated by writeMetrics callbacks are written to pushURL.
|
||||
func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error {
|
||||
pc, err := newPushContext(pushURL, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// validate interval
|
||||
if interval <= 0 {
|
||||
return fmt.Errorf("interval must be positive; got %s", interval)
|
||||
}
|
||||
pushMetricsSet.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pc.pushURLRedacted)).Set(interval.Seconds())
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
stopCh := ctx.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
ctxLocal, cancel := context.WithTimeout(ctx, interval+time.Second)
|
||||
err := pc.pushMetrics(ctxLocal, writeMetrics)
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Printf("ERROR: metrics.push: %s", err)
|
||||
}
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushMetricsExt pushes metrics generated by wirteMetrics to pushURL.
|
||||
//
|
||||
// The writeMetrics callback must write metrics to w in Prometheus text exposition format without timestamps and trailing comments.
|
||||
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
|
||||
//
|
||||
// opts may contain additional configuration options if non-nil.
|
||||
//
|
||||
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
|
||||
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
|
||||
func PushMetricsExt(ctx context.Context, pushURL string, writeMetrics func(w io.Writer), opts *PushOptions) error {
|
||||
pc, err := newPushContext(pushURL, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return pc.pushMetrics(ctx, writeMetrics)
|
||||
}
|
||||
|
||||
type pushContext struct {
|
||||
pushURL *url.URL
|
||||
pushURLRedacted string
|
||||
extraLabels string
|
||||
headers http.Header
|
||||
disableCompression bool
|
||||
|
||||
client *http.Client
|
||||
|
||||
pushesTotal *Counter
|
||||
bytesPushedTotal *Counter
|
||||
pushBlockSize *Histogram
|
||||
pushDuration *Histogram
|
||||
pushErrors *Counter
|
||||
}
|
||||
|
||||
func newPushContext(pushURL string, opts *PushOptions) (*pushContext, error) {
|
||||
if opts == nil {
|
||||
opts = &PushOptions{}
|
||||
}
|
||||
@ -186,24 +270,19 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D
|
||||
// validate pushURL
|
||||
pu, err := url.Parse(pushURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err)
|
||||
return nil, fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err)
|
||||
}
|
||||
if pu.Scheme != "http" && pu.Scheme != "https" {
|
||||
return fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL)
|
||||
return nil, fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL)
|
||||
}
|
||||
if pu.Host == "" {
|
||||
return fmt.Errorf("missing host in pushURL=%q", pushURL)
|
||||
}
|
||||
|
||||
// validate interval
|
||||
if interval <= 0 {
|
||||
return fmt.Errorf("interval must be positive; got %s", interval)
|
||||
return nil, fmt.Errorf("missing host in pushURL=%q", pushURL)
|
||||
}
|
||||
|
||||
// validate ExtraLabels
|
||||
extraLabels := opts.ExtraLabels
|
||||
if err := validateTags(extraLabels); err != nil {
|
||||
return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
|
||||
return nil, fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
|
||||
}
|
||||
|
||||
// validate Headers
|
||||
@ -211,111 +290,108 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D
|
||||
for _, h := range opts.Headers {
|
||||
n := strings.IndexByte(h, ':')
|
||||
if n < 0 {
|
||||
return fmt.Errorf("missing `:` delimiter in the header %q", h)
|
||||
return nil, fmt.Errorf("missing `:` delimiter in the header %q", h)
|
||||
}
|
||||
name := strings.TrimSpace(h[:n])
|
||||
value := strings.TrimSpace(h[n+1:])
|
||||
headers.Add(name, value)
|
||||
}
|
||||
|
||||
// validate DisableCompression
|
||||
disableCompression := opts.DisableCompression
|
||||
|
||||
// Initialize metrics for the given pushURL
|
||||
pushURLRedacted := pu.Redacted()
|
||||
pushesTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted))
|
||||
pushErrorsTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted))
|
||||
bytesPushedTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted))
|
||||
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
|
||||
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
|
||||
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds())
|
||||
client := &http.Client{}
|
||||
return &pushContext{
|
||||
pushURL: pu,
|
||||
pushURLRedacted: pushURLRedacted,
|
||||
extraLabels: extraLabels,
|
||||
headers: headers,
|
||||
disableCompression: opts.DisableCompression,
|
||||
|
||||
c := &http.Client{
|
||||
Timeout: interval,
|
||||
client: client,
|
||||
|
||||
pushesTotal: pushMetricsSet.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted)),
|
||||
bytesPushedTotal: pushMetricsSet.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted)),
|
||||
pushBlockSize: pushMetricsSet.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted)),
|
||||
pushDuration: pushMetricsSet.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted)),
|
||||
pushErrors: pushMetricsSet.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pc *pushContext) pushMetrics(ctx context.Context, writeMetrics func(w io.Writer)) error {
|
||||
bb := getBytesBuffer()
|
||||
defer putBytesBuffer(bb)
|
||||
|
||||
writeMetrics(bb)
|
||||
|
||||
if len(pc.extraLabels) > 0 {
|
||||
bbTmp := getBytesBuffer()
|
||||
bbTmp.B = append(bbTmp.B[:0], bb.B...)
|
||||
bb.B = addExtraLabels(bb.B[:0], bbTmp.B, pc.extraLabels)
|
||||
putBytesBuffer(bbTmp)
|
||||
}
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
var bb bytes.Buffer
|
||||
var tmpBuf []byte
|
||||
zw := gzip.NewWriter(&bb)
|
||||
stopCh := ctx.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
|
||||
bb.Reset()
|
||||
writeMetrics(&bb)
|
||||
if len(extraLabels) > 0 {
|
||||
tmpBuf = addExtraLabels(tmpBuf[:0], bb.Bytes(), extraLabels)
|
||||
bb.Reset()
|
||||
if _, err := bb.Write(tmpBuf); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot write %d bytes to bytes.Buffer: %s", len(tmpBuf), err))
|
||||
}
|
||||
}
|
||||
if !disableCompression {
|
||||
tmpBuf = append(tmpBuf[:0], bb.Bytes()...)
|
||||
bb.Reset()
|
||||
zw.Reset(&bb)
|
||||
if _, err := zw.Write(tmpBuf); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(tmpBuf), err))
|
||||
}
|
||||
if err := zw.Close(); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err))
|
||||
}
|
||||
}
|
||||
pushesTotal.Inc()
|
||||
blockLen := bb.Len()
|
||||
bytesPushedTotal.Add(blockLen)
|
||||
pushBlockSize.Update(float64(blockLen))
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", pushURL, &bb)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
|
||||
}
|
||||
|
||||
// Set the needed headers
|
||||
for name, values := range headers {
|
||||
for _, value := range values {
|
||||
req.Header.Add(name, value)
|
||||
}
|
||||
}
|
||||
req.Header.Set("Content-Type", "text/plain")
|
||||
|
||||
if !disableCompression {
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
}
|
||||
|
||||
// Perform the request
|
||||
startTime := time.Now()
|
||||
resp, err := c.Do(req)
|
||||
pushDuration.UpdateDuration(startTime)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err)
|
||||
pushErrorsTotal.Inc()
|
||||
}
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode/100 != 2 {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q",
|
||||
pushURLRedacted, resp.StatusCode, body)
|
||||
pushErrorsTotal.Inc()
|
||||
continue
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if !pc.disableCompression {
|
||||
bbTmp := getBytesBuffer()
|
||||
bbTmp.B = append(bbTmp.B[:0], bb.B...)
|
||||
bb.B = bb.B[:0]
|
||||
zw := getGzipWriter(bb)
|
||||
if _, err := zw.Write(bbTmp.B); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(bbTmp.B), err))
|
||||
}
|
||||
}()
|
||||
if err := zw.Close(); err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err))
|
||||
}
|
||||
putGzipWriter(zw)
|
||||
putBytesBuffer(bbTmp)
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
pc.pushesTotal.Inc()
|
||||
blockLen := len(bb.B)
|
||||
pc.bytesPushedTotal.Add(blockLen)
|
||||
pc.pushBlockSize.Update(float64(blockLen))
|
||||
|
||||
// Prepare the request to sent to pc.pushURL
|
||||
reqBody := bytes.NewReader(bb.B)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", pc.pushURL.String(), reqBody)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pc.pushURLRedacted, err))
|
||||
}
|
||||
|
||||
// Set the needed headers
|
||||
for name, values := range pc.headers {
|
||||
for _, value := range values {
|
||||
req.Header.Add(name, value)
|
||||
}
|
||||
}
|
||||
req.Header.Set("Content-Type", "text/plain")
|
||||
if !pc.disableCompression {
|
||||
req.Header.Set("Content-Encoding", "gzip")
|
||||
}
|
||||
|
||||
// Perform the request
|
||||
startTime := time.Now()
|
||||
resp, err := pc.client.Do(req)
|
||||
pc.pushDuration.UpdateDuration(startTime)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return nil
|
||||
}
|
||||
pc.pushErrors.Inc()
|
||||
return fmt.Errorf("cannot push metrics to %q: %s", pc.pushURLRedacted, err)
|
||||
}
|
||||
if resp.StatusCode/100 != 2 {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
pc.pushErrors.Inc()
|
||||
return fmt.Errorf("unexpected status code in response from %q: %d; expecting 2xx; response body: %q", pc.pushURLRedacted, resp.StatusCode, body)
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
var pushMetrics = NewSet()
|
||||
var pushMetricsSet = NewSet()
|
||||
|
||||
func writePushMetrics(w io.Writer) {
|
||||
pushMetrics.WritePrometheus(w)
|
||||
pushMetricsSet.WritePrometheus(w)
|
||||
}
|
||||
|
||||
func addExtraLabels(dst, src []byte, extraLabels string) []byte {
|
||||
@ -363,3 +439,44 @@ func addExtraLabels(dst, src []byte, extraLabels string) []byte {
|
||||
}
|
||||
|
||||
var bashBytes = []byte("#")
|
||||
|
||||
func getBytesBuffer() *bytesBuffer {
|
||||
v := bytesBufferPool.Get()
|
||||
if v == nil {
|
||||
return &bytesBuffer{}
|
||||
}
|
||||
return v.(*bytesBuffer)
|
||||
}
|
||||
|
||||
func putBytesBuffer(bb *bytesBuffer) {
|
||||
bb.B = bb.B[:0]
|
||||
bytesBufferPool.Put(bb)
|
||||
}
|
||||
|
||||
var bytesBufferPool sync.Pool
|
||||
|
||||
type bytesBuffer struct {
|
||||
B []byte
|
||||
}
|
||||
|
||||
func (bb *bytesBuffer) Write(p []byte) (int, error) {
|
||||
bb.B = append(bb.B, p...)
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func getGzipWriter(w io.Writer) *gzip.Writer {
|
||||
v := gzipWriterPool.Get()
|
||||
if v == nil {
|
||||
return gzip.NewWriter(w)
|
||||
}
|
||||
zw := v.(*gzip.Writer)
|
||||
zw.Reset(w)
|
||||
return zw
|
||||
}
|
||||
|
||||
func putGzipWriter(zw *gzip.Writer) {
|
||||
zw.Reset(io.Discard)
|
||||
gzipWriterPool.Put(zw)
|
||||
}
|
||||
|
||||
var gzipWriterPool sync.Pool
|
||||
|
84
push_test.go
84
push_test.go
@ -151,3 +151,87 @@ func TestInitPushWithOptions(t *testing.T) {
|
||||
Headers: []string{"Foo: Bar", "baz:aaaa-bbb"},
|
||||
}, "Baz: aaaa-bbb\r\nContent-Encoding: gzip\r\nContent-Type: text/plain\r\nFoo: Bar\r\n", "bar 42.12\nfoo 1234\n")
|
||||
}
|
||||
|
||||
func TestPushMetrics(t *testing.T) {
|
||||
f := func(s *Set, opts *PushOptions, expectedHeaders, expectedData string) {
|
||||
t.Helper()
|
||||
|
||||
var reqHeaders []byte
|
||||
var reqData []byte
|
||||
var reqErr error
|
||||
doneCh := make(chan struct{})
|
||||
firstRequest := true
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if firstRequest {
|
||||
var bb bytes.Buffer
|
||||
r.Header.WriteSubset(&bb, map[string]bool{
|
||||
"Accept-Encoding": true,
|
||||
"Content-Length": true,
|
||||
"User-Agent": true,
|
||||
})
|
||||
reqHeaders = bb.Bytes()
|
||||
reqData, reqErr = io.ReadAll(r.Body)
|
||||
close(doneCh)
|
||||
firstRequest = false
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
ctx := context.Background()
|
||||
if err := s.PushMetrics(ctx, srv.URL, opts); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout!")
|
||||
case <-doneCh:
|
||||
}
|
||||
if reqErr != nil {
|
||||
t.Fatalf("unexpected error: %s", reqErr)
|
||||
}
|
||||
if opts == nil || !opts.DisableCompression {
|
||||
zr, err := gzip.NewReader(bytes.NewBuffer(reqData))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize gzip reader: %s", err)
|
||||
}
|
||||
data, err := io.ReadAll(zr)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot read data from gzip reader: %s", err)
|
||||
}
|
||||
if err := zr.Close(); err != nil {
|
||||
t.Fatalf("unexpected error when closing gzip reader: %s", err)
|
||||
}
|
||||
reqData = data
|
||||
}
|
||||
if string(reqHeaders) != expectedHeaders {
|
||||
t.Fatalf("unexpected request headers; got\n%s\nwant\n%s", reqHeaders, expectedHeaders)
|
||||
}
|
||||
if string(reqData) != expectedData {
|
||||
t.Fatalf("unexpected data; got\n%s\nwant\n%s", reqData, expectedData)
|
||||
}
|
||||
}
|
||||
|
||||
s := NewSet()
|
||||
c := s.NewCounter("foo")
|
||||
c.Set(1234)
|
||||
_ = s.NewGauge("bar", func() float64 {
|
||||
return 42.12
|
||||
})
|
||||
|
||||
// nil PushOptions
|
||||
f(s, nil, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n")
|
||||
|
||||
// Disable compression on the pushed request body
|
||||
f(s, &PushOptions{
|
||||
DisableCompression: true,
|
||||
}, "Content-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n")
|
||||
|
||||
// Add extra labels
|
||||
f(s, &PushOptions{
|
||||
ExtraLabels: `label1="value1",label2="value2"`,
|
||||
}, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", `bar{label1="value1",label2="value2"} 42.12`+"\n"+`foo{label1="value1",label2="value2"} 1234`+"\n")
|
||||
|
||||
// Add extra headers
|
||||
f(s, &PushOptions{
|
||||
Headers: []string{"Foo: Bar", "baz:aaaa-bbb"},
|
||||
}, "Baz: aaaa-bbb\r\nContent-Encoding: gzip\r\nContent-Type: text/plain\r\nFoo: Bar\r\n", "bar 42.12\nfoo 1234\n")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user