push.go: add ability to cancel periodic push via passed context

This commit is contained in:
Aliaksandr Valialkin 2023-12-17 18:23:33 +02:00
parent a9e3faa53c
commit bd3cd7b6ff
No known key found for this signature in database
GPG Key ID: 52C003EE2BCDB9EB
2 changed files with 117 additions and 11 deletions

33
push.go
View File

@ -2,6 +2,8 @@ package metrics
import ( import (
"bytes" "bytes"
"context"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -32,6 +34,8 @@ type PushOptions struct {
// InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval. // InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval.
// //
// The periodic push is stopped when ctx is canceled.
//
// If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL. // If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL.
// //
// opts may contain additional configuration options if non-nil. // opts may contain additional configuration options if non-nil.
@ -44,11 +48,11 @@ type PushOptions struct {
// //
// It is OK calling InitPushWithOptions multiple times with different pushURL - // It is OK calling InitPushWithOptions multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls. // in this case metrics are pushed to all the provided pushURL urls.
func InitPushWithOptions(pushURL string, interval time.Duration, pushProcessMetrics bool, opts *PushOptions) error { func InitPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, pushProcessMetrics bool, opts *PushOptions) error {
writeMetrics := func(w io.Writer) { writeMetrics := func(w io.Writer) {
WritePrometheus(w, pushProcessMetrics) WritePrometheus(w, pushProcessMetrics)
} }
return initPushWithOptions(pushURL, interval, writeMetrics, opts) return initPushWithOptions(ctx, pushURL, interval, writeMetrics, opts)
} }
// InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval. // InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval.
@ -95,6 +99,8 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr
// InitPushWithOptions sets up periodic push for metrics from s to the given pushURL with the given interval. // InitPushWithOptions sets up periodic push for metrics from s to the given pushURL with the given interval.
// //
// The periodic push is stopped when the ctx is canceled.
//
// opts may contain additional configuration options if non-nil. // opts may contain additional configuration options if non-nil.
// //
// The metrics are pushed to pushURL in Prometheus text exposition format. // The metrics are pushed to pushURL in Prometheus text exposition format.
@ -105,11 +111,11 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr
// //
// It is OK calling InitPushWithOptions multiple times with different pushURL - // It is OK calling InitPushWithOptions multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls. // in this case metrics are pushed to all the provided pushURL urls.
func (s *Set) InitPushWithOptions(pushURL string, interval time.Duration, opts *PushOptions) error { func (s *Set) InitPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, opts *PushOptions) error {
writeMetrics := func(w io.Writer) { writeMetrics := func(w io.Writer) {
s.WritePrometheus(w) s.WritePrometheus(w)
} }
return initPushWithOptions(pushURL, interval, writeMetrics, opts) return initPushWithOptions(ctx, pushURL, interval, writeMetrics, opts)
} }
// InitPush sets up periodic push for metrics from s to the given pushURL with the given interval. // InitPush sets up periodic push for metrics from s to the given pushURL with the given interval.
@ -152,10 +158,10 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
opts := &PushOptions{ opts := &PushOptions{
ExtraLabels: extraLabels, ExtraLabels: extraLabels,
} }
return initPushWithOptions(pushURL, interval, writeMetrics, opts) return initPushWithOptions(context.Background(), pushURL, interval, writeMetrics, opts)
} }
func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error { func initPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error {
// validate pushURL // validate pushURL
pu, err := url.Parse(pushURL) pu, err := url.Parse(pushURL)
if err != nil { if err != nil {
@ -183,7 +189,7 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu
} }
// validate Headers // validate Headers
var headers http.Header headers := make(http.Header)
if opts != nil { if opts != nil {
for _, h := range opts.Headers { for _, h := range opts.Headers {
n := strings.IndexByte(h, ':') n := strings.IndexByte(h, ':')
@ -219,7 +225,14 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu
var bb bytes.Buffer var bb bytes.Buffer
var tmpBuf []byte var tmpBuf []byte
zw := gzip.NewWriter(&bb) zw := gzip.NewWriter(&bb)
for range ticker.C { stopCh := ctx.Done()
for {
select {
case <-ticker.C:
case <-stopCh:
return
}
bb.Reset() bb.Reset()
writeMetrics(&bb) writeMetrics(&bb)
if len(extraLabels) > 0 { if len(extraLabels) > 0 {
@ -244,7 +257,7 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu
blockLen := bb.Len() blockLen := bb.Len()
bytesPushedTotal.Add(blockLen) bytesPushedTotal.Add(blockLen)
pushBlockSize.Update(float64(blockLen)) pushBlockSize.Update(float64(blockLen))
req, err := http.NewRequest("GET", pushURL, &bb) req, err := http.NewRequestWithContext(ctx, "GET", pushURL, &bb)
if err != nil { if err != nil {
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err)) panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
} }
@ -266,8 +279,10 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu
resp, err := c.Do(req) resp, err := c.Do(req)
pushDuration.UpdateDuration(startTime) pushDuration.UpdateDuration(startTime)
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) {
log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err) log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err)
pushErrorsTotal.Inc() pushErrorsTotal.Inc()
}
continue continue
} }
if resp.StatusCode/100 != 2 { if resp.StatusCode/100 != 2 {

View File

@ -1,6 +1,12 @@
package metrics package metrics
import ( import (
"bytes"
"compress/gzip"
"context"
"io"
"net/http"
"net/http/httptest"
"testing" "testing"
"time" "time"
) )
@ -59,3 +65,88 @@ func TestInitPushFailure(t *testing.T) {
f("http://foobar", time.Second, `{foo="bar"}`) f("http://foobar", time.Second, `{foo="bar"}`)
f("http://foobar", time.Second, `a{foo="bar"}`) f("http://foobar", time.Second, `a{foo="bar"}`)
} }
func TestInitPushWithOptions(t *testing.T) {
f := func(s *Set, opts *PushOptions, expectedHeaders, expectedData string) {
t.Helper()
var reqHeaders []byte
var reqData []byte
var reqErr error
doneCh := make(chan struct{})
firstRequest := true
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if firstRequest {
var bb bytes.Buffer
r.Header.WriteSubset(&bb, map[string]bool{
"Accept-Encoding": true,
"Content-Length": true,
"User-Agent": true,
})
reqHeaders = bb.Bytes()
reqData, reqErr = io.ReadAll(r.Body)
close(doneCh)
firstRequest = false
}
}))
ctx, cancel := context.WithCancel(context.Background())
if err := s.InitPushWithOptions(ctx, srv.URL, time.Millisecond, opts); err != nil {
t.Fatalf("unexpected error: %s", err)
}
select {
case <-time.After(5 * time.Second):
t.Fatalf("timeout!")
case <-doneCh:
// stop the periodic pusher
cancel()
}
if reqErr != nil {
t.Fatalf("unexpected error: %s", reqErr)
}
if opts == nil || !opts.DisableCompression {
zr, err := gzip.NewReader(bytes.NewBuffer(reqData))
if err != nil {
t.Fatalf("cannot initialize gzip reader: %s", err)
}
data, err := io.ReadAll(zr)
if err != nil {
t.Fatalf("cannot read data from gzip reader: %s", err)
}
if err := zr.Close(); err != nil {
t.Fatalf("unexpected error when closing gzip reader: %s", err)
}
reqData = data
}
if string(reqHeaders) != expectedHeaders {
t.Fatalf("unexpected request headers; got\n%s\nwant\n%s", reqHeaders, expectedHeaders)
}
if string(reqData) != expectedData {
t.Fatalf("unexpected data; got\n%s\nwant\n%s", reqData, expectedData)
}
}
s := NewSet()
c := s.NewCounter("foo")
c.Set(1234)
_ = s.NewGauge("bar", func() float64 {
return 42.12
})
// nil PushOptions
f(s, nil, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n")
// Disable compression on the pushed request body
f(s, &PushOptions{
DisableCompression: true,
}, "Content-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n")
// Add extra labels
f(s, &PushOptions{
ExtraLabels: `label1="value1",label2="value2"`,
}, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", `bar{label1="value1",label2="value2"} 42.12`+"\n"+`foo{label1="value1",label2="value2"} 1234`+"\n")
// Add extra headers
f(s, &PushOptions{
Headers: []string{"Foo: Bar", "baz:aaaa-bbb"},
}, "Baz: aaaa-bbb\r\nContent-Encoding: gzip\r\nContent-Type: text/plain\r\nFoo: Bar\r\n", "bar 42.12\nfoo 1234\n")
}