diff --git a/push.go b/push.go index afd34d0..4f69bc0 100644 --- a/push.go +++ b/push.go @@ -2,6 +2,8 @@ package metrics import ( "bytes" + "context" + "errors" "fmt" "io" "io/ioutil" @@ -32,6 +34,8 @@ type PushOptions struct { // InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval. // +// The periodic push is stopped when ctx is canceled. +// // If pushProcessMetrics is set to true, then 'process_*' and `go_*` metrics are also pushed to pushURL. // // opts may contain additional configuration options if non-nil. @@ -44,11 +48,11 @@ type PushOptions struct { // // It is OK calling InitPushWithOptions multiple times with different pushURL - // in this case metrics are pushed to all the provided pushURL urls. -func InitPushWithOptions(pushURL string, interval time.Duration, pushProcessMetrics bool, opts *PushOptions) error { +func InitPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, pushProcessMetrics bool, opts *PushOptions) error { writeMetrics := func(w io.Writer) { WritePrometheus(w, pushProcessMetrics) } - return initPushWithOptions(pushURL, interval, writeMetrics, opts) + return initPushWithOptions(ctx, pushURL, interval, writeMetrics, opts) } // InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval. @@ -95,6 +99,8 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr // InitPushWithOptions sets up periodic push for metrics from s to the given pushURL with the given interval. // +// The periodic push is stopped when the ctx is canceled. +// // opts may contain additional configuration options if non-nil. // // The metrics are pushed to pushURL in Prometheus text exposition format. @@ -105,11 +111,11 @@ func InitPush(pushURL string, interval time.Duration, extraLabels string, pushPr // // It is OK calling InitPushWithOptions multiple times with different pushURL - // in this case metrics are pushed to all the provided pushURL urls. -func (s *Set) InitPushWithOptions(pushURL string, interval time.Duration, opts *PushOptions) error { +func (s *Set) InitPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, opts *PushOptions) error { writeMetrics := func(w io.Writer) { s.WritePrometheus(w) } - return initPushWithOptions(pushURL, interval, writeMetrics, opts) + return initPushWithOptions(ctx, pushURL, interval, writeMetrics, opts) } // InitPush sets up periodic push for metrics from s to the given pushURL with the given interval. @@ -152,10 +158,10 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri opts := &PushOptions{ ExtraLabels: extraLabels, } - return initPushWithOptions(pushURL, interval, writeMetrics, opts) + return initPushWithOptions(context.Background(), pushURL, interval, writeMetrics, opts) } -func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error { +func initPushWithOptions(ctx context.Context, pushURL string, interval time.Duration, writeMetrics func(w io.Writer), opts *PushOptions) error { // validate pushURL pu, err := url.Parse(pushURL) if err != nil { @@ -183,7 +189,7 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu } // validate Headers - var headers http.Header + headers := make(http.Header) if opts != nil { for _, h := range opts.Headers { n := strings.IndexByte(h, ':') @@ -219,7 +225,14 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu var bb bytes.Buffer var tmpBuf []byte zw := gzip.NewWriter(&bb) - for range ticker.C { + stopCh := ctx.Done() + for { + select { + case <-ticker.C: + case <-stopCh: + return + } + bb.Reset() writeMetrics(&bb) if len(extraLabels) > 0 { @@ -244,7 +257,7 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu blockLen := bb.Len() bytesPushedTotal.Add(blockLen) pushBlockSize.Update(float64(blockLen)) - req, err := http.NewRequest("GET", pushURL, &bb) + req, err := http.NewRequestWithContext(ctx, "GET", pushURL, &bb) if err != nil { panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err)) } @@ -266,8 +279,10 @@ func initPushWithOptions(pushURL string, interval time.Duration, writeMetrics fu resp, err := c.Do(req) pushDuration.UpdateDuration(startTime) if err != nil { - log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err) - pushErrorsTotal.Inc() + if !errors.Is(err, context.Canceled) { + log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err) + pushErrorsTotal.Inc() + } continue } if resp.StatusCode/100 != 2 { diff --git a/push_test.go b/push_test.go index 0e495b0..eedcb76 100644 --- a/push_test.go +++ b/push_test.go @@ -1,6 +1,12 @@ package metrics import ( + "bytes" + "compress/gzip" + "context" + "io" + "net/http" + "net/http/httptest" "testing" "time" ) @@ -59,3 +65,88 @@ func TestInitPushFailure(t *testing.T) { f("http://foobar", time.Second, `{foo="bar"}`) f("http://foobar", time.Second, `a{foo="bar"}`) } + +func TestInitPushWithOptions(t *testing.T) { + f := func(s *Set, opts *PushOptions, expectedHeaders, expectedData string) { + t.Helper() + + var reqHeaders []byte + var reqData []byte + var reqErr error + doneCh := make(chan struct{}) + firstRequest := true + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if firstRequest { + var bb bytes.Buffer + r.Header.WriteSubset(&bb, map[string]bool{ + "Accept-Encoding": true, + "Content-Length": true, + "User-Agent": true, + }) + reqHeaders = bb.Bytes() + reqData, reqErr = io.ReadAll(r.Body) + close(doneCh) + firstRequest = false + } + })) + ctx, cancel := context.WithCancel(context.Background()) + if err := s.InitPushWithOptions(ctx, srv.URL, time.Millisecond, opts); err != nil { + t.Fatalf("unexpected error: %s", err) + } + select { + case <-time.After(5 * time.Second): + t.Fatalf("timeout!") + case <-doneCh: + // stop the periodic pusher + cancel() + } + if reqErr != nil { + t.Fatalf("unexpected error: %s", reqErr) + } + if opts == nil || !opts.DisableCompression { + zr, err := gzip.NewReader(bytes.NewBuffer(reqData)) + if err != nil { + t.Fatalf("cannot initialize gzip reader: %s", err) + } + data, err := io.ReadAll(zr) + if err != nil { + t.Fatalf("cannot read data from gzip reader: %s", err) + } + if err := zr.Close(); err != nil { + t.Fatalf("unexpected error when closing gzip reader: %s", err) + } + reqData = data + } + if string(reqHeaders) != expectedHeaders { + t.Fatalf("unexpected request headers; got\n%s\nwant\n%s", reqHeaders, expectedHeaders) + } + if string(reqData) != expectedData { + t.Fatalf("unexpected data; got\n%s\nwant\n%s", reqData, expectedData) + } + } + + s := NewSet() + c := s.NewCounter("foo") + c.Set(1234) + _ = s.NewGauge("bar", func() float64 { + return 42.12 + }) + + // nil PushOptions + f(s, nil, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n") + + // Disable compression on the pushed request body + f(s, &PushOptions{ + DisableCompression: true, + }, "Content-Type: text/plain\r\n", "bar 42.12\nfoo 1234\n") + + // Add extra labels + f(s, &PushOptions{ + ExtraLabels: `label1="value1",label2="value2"`, + }, "Content-Encoding: gzip\r\nContent-Type: text/plain\r\n", `bar{label1="value1",label2="value2"} 42.12`+"\n"+`foo{label1="value1",label2="value2"} 1234`+"\n") + + // Add extra headers + f(s, &PushOptions{ + Headers: []string{"Foo: Bar", "baz:aaaa-bbb"}, + }, "Baz: aaaa-bbb\r\nContent-Encoding: gzip\r\nContent-Type: text/plain\r\nFoo: Bar\r\n", "bar 42.12\nfoo 1234\n") +}