push.go: expose metrics_push_*
metrics related to metrics push process
Updates https://github.com/VictoriaMetrics/metrics/issues/35
This commit is contained in:
parent
fe970f4df0
commit
e9b4bb1534
@ -99,6 +99,7 @@ func WritePrometheus(w io.Writer, exposeProcessMetrics bool) {
|
|||||||
func WriteProcessMetrics(w io.Writer) {
|
func WriteProcessMetrics(w io.Writer) {
|
||||||
writeGoMetrics(w)
|
writeGoMetrics(w)
|
||||||
writeProcessMetrics(w)
|
writeProcessMetrics(w)
|
||||||
|
writePushMetrics(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteFDMetrics writes `process_max_fds` and `process_open_fds` metrics to w.
|
// WriteFDMetrics writes `process_max_fds` and `process_open_fds` metrics to w.
|
||||||
|
22
push.go
22
push.go
@ -112,6 +112,12 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
|||||||
c := &http.Client{
|
c := &http.Client{
|
||||||
Timeout: interval,
|
Timeout: interval,
|
||||||
}
|
}
|
||||||
|
pushesTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted))
|
||||||
|
pushErrorsTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted))
|
||||||
|
bytesPushedTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted))
|
||||||
|
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
|
||||||
|
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
|
||||||
|
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds())
|
||||||
go func() {
|
go func() {
|
||||||
ticker := time.NewTicker(interval)
|
ticker := time.NewTicker(interval)
|
||||||
var bb bytes.Buffer
|
var bb bytes.Buffer
|
||||||
@ -136,15 +142,22 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
|||||||
if err := zw.Close(); err != nil {
|
if err := zw.Close(); err != nil {
|
||||||
panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err))
|
panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err))
|
||||||
}
|
}
|
||||||
|
pushesTotal.Inc()
|
||||||
|
blockLen := bb.Len()
|
||||||
|
bytesPushedTotal.Add(blockLen)
|
||||||
|
pushBlockSize.Update(float64(blockLen))
|
||||||
req, err := http.NewRequest("GET", pushURL, &bb)
|
req, err := http.NewRequest("GET", pushURL, &bb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("ERROR: metrics.push: cannot initialize request for metrics push to %q: %s", pushURLRedacted, err)
|
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "text/plain")
|
req.Header.Set("Content-Type", "text/plain")
|
||||||
req.Header.Set("Content-Encoding", "gzip")
|
req.Header.Set("Content-Encoding", "gzip")
|
||||||
|
startTime := time.Now()
|
||||||
resp, err := c.Do(req)
|
resp, err := c.Do(req)
|
||||||
|
pushDuration.UpdateDuration(startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err)
|
log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err)
|
||||||
|
pushErrorsTotal.Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if resp.StatusCode/100 != 2 {
|
if resp.StatusCode/100 != 2 {
|
||||||
@ -152,6 +165,7 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
|||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q",
|
log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q",
|
||||||
pushURLRedacted, resp.StatusCode, body)
|
pushURLRedacted, resp.StatusCode, body)
|
||||||
|
pushErrorsTotal.Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
@ -160,6 +174,12 @@ func InitPushExt(pushURL string, interval time.Duration, extraLabels string, wri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var pushMetrics = NewSet()
|
||||||
|
|
||||||
|
func writePushMetrics(w io.Writer) {
|
||||||
|
pushMetrics.WritePrometheus(w)
|
||||||
|
}
|
||||||
|
|
||||||
func addExtraLabels(dst, src []byte, extraLabels string) []byte {
|
func addExtraLabels(dst, src []byte, extraLabels string) []byte {
|
||||||
for len(src) > 0 {
|
for len(src) > 0 {
|
||||||
var line []byte
|
var line []byte
|
||||||
|
Loading…
Reference in New Issue
Block a user