Metrics interface and Prometheus implementation (#1929)
* Metrics interface * Prometheus implementation * NoOp implementation Co-authored-by: chris <chris@Profanity.local>
This commit is contained in:
26
metrics/prometheus/README.md
Normal file
26
metrics/prometheus/README.md
Normal file
@@ -0,0 +1,26 @@
|
||||
Prometheus
|
||||
==========
|
||||
|
||||
A Prometheus "pull" based implementation of the metrics Reporter interface.
|
||||
|
||||
|
||||
Capabilities
|
||||
------------
|
||||
|
||||
* Go runtime metrics are handled natively by the Prometheus client library (CPU / MEM / GC / GoRoutines etc).
|
||||
* User-defined metrics are registered in the Prometheus client dynamically (they must be pre-registered, hence all of the faffing around in metric_family.go).
|
||||
* The metrics are made available on a Prometheus-compatible HTTP endpoint, which can be scraped at any time. This means that the user can very easily access stats even running locally as a standalone binary.
|
||||
* Requires a micro.Server parameter (from which it gathers the service name and version). These are included as tags with every metric.
|
||||
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
```golang
|
||||
prometheusReporter := metrics.New(server)
|
||||
tags := metrics.Tags{"greeter": "Janos"}
|
||||
err := prometheusReporter.Count("hellos", 1, tags)
|
||||
if err != nil {
|
||||
fmt.Printf("Error setting a Count metric: %v", err)
|
||||
}
|
||||
```
|
109
metrics/prometheus/metric_family.go
Normal file
109
metrics/prometheus/metric_family.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// metricFamily stores our cached metrics:
|
||||
type metricFamily struct {
|
||||
counters map[string]*prometheus.CounterVec
|
||||
gauges map[string]*prometheus.GaugeVec
|
||||
timings map[string]*prometheus.SummaryVec
|
||||
defaultLabels prometheus.Labels
|
||||
mutex sync.Mutex
|
||||
prometheusRegistry *prometheus.Registry
|
||||
timingObjectives map[float64]float64
|
||||
}
|
||||
|
||||
// newMetricFamily returns a new metricFamily (useful in case we want to change the structure later):
|
||||
func (r *Reporter) newMetricFamily() metricFamily {
|
||||
return metricFamily{
|
||||
counters: make(map[string]*prometheus.CounterVec),
|
||||
gauges: make(map[string]*prometheus.GaugeVec),
|
||||
timings: make(map[string]*prometheus.SummaryVec),
|
||||
defaultLabels: r.convertTags(r.options.DefaultTags),
|
||||
prometheusRegistry: r.prometheusRegistry,
|
||||
timingObjectives: r.options.TimingObjectives,
|
||||
}
|
||||
}
|
||||
|
||||
// getCounter either gets a counter, or makes a new one:
|
||||
func (mf *metricFamily) getCounter(name string, labelNames []string) *prometheus.CounterVec {
|
||||
mf.mutex.Lock()
|
||||
defer mf.mutex.Unlock()
|
||||
|
||||
// See if we already have this counter:
|
||||
counter, ok := mf.counters[name]
|
||||
if !ok {
|
||||
|
||||
// Make a new counter:
|
||||
counter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: name,
|
||||
ConstLabels: mf.defaultLabels,
|
||||
},
|
||||
labelNames,
|
||||
)
|
||||
|
||||
// Register it and add it to our list:
|
||||
mf.prometheusRegistry.MustRegister(counter)
|
||||
mf.counters[name] = counter
|
||||
}
|
||||
|
||||
return counter
|
||||
}
|
||||
|
||||
// getGauge either gets a gauge, or makes a new one:
|
||||
func (mf *metricFamily) getGauge(name string, labelNames []string) *prometheus.GaugeVec {
|
||||
mf.mutex.Lock()
|
||||
defer mf.mutex.Unlock()
|
||||
|
||||
// See if we already have this gauge:
|
||||
gauge, ok := mf.gauges[name]
|
||||
if !ok {
|
||||
|
||||
// Make a new gauge:
|
||||
gauge = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: name,
|
||||
ConstLabels: mf.defaultLabels,
|
||||
},
|
||||
labelNames,
|
||||
)
|
||||
|
||||
// Register it and add it to our list:
|
||||
mf.prometheusRegistry.MustRegister(gauge)
|
||||
mf.gauges[name] = gauge
|
||||
}
|
||||
|
||||
return gauge
|
||||
}
|
||||
|
||||
// getTiming either gets a timing, or makes a new one:
|
||||
func (mf *metricFamily) getTiming(name string, labelNames []string) *prometheus.SummaryVec {
|
||||
mf.mutex.Lock()
|
||||
defer mf.mutex.Unlock()
|
||||
|
||||
// See if we already have this timing:
|
||||
timing, ok := mf.timings[name]
|
||||
if !ok {
|
||||
|
||||
// Make a new timing:
|
||||
timing = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: name,
|
||||
ConstLabels: mf.defaultLabels,
|
||||
Objectives: mf.timingObjectives,
|
||||
},
|
||||
labelNames,
|
||||
)
|
||||
|
||||
// Register it and add it to our list:
|
||||
mf.prometheusRegistry.MustRegister(timing)
|
||||
mf.timings[name] = timing
|
||||
}
|
||||
|
||||
return timing
|
||||
}
|
68
metrics/prometheus/metrics.go
Normal file
68
metrics/prometheus/metrics.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v3/metrics"
|
||||
)
|
||||
|
||||
// ErrPrometheusPanic is a catch-all for the panics which can be thrown by the Prometheus client:
|
||||
var ErrPrometheusPanic = errors.New("The Prometheus client panicked. Did you do something like change the tag cardinality or the type of a metric?")
|
||||
|
||||
// Count is a counter with key/value tags:
|
||||
// New values are added to any previous one (eg "number of hits")
|
||||
func (r *Reporter) Count(name string, value int64, tags metrics.Tags) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = ErrPrometheusPanic
|
||||
}
|
||||
}()
|
||||
|
||||
counter := r.metrics.getCounter(r.stripUnsupportedCharacters(name), r.listTagKeys(tags))
|
||||
metric, err := counter.GetMetricWith(r.convertTags(tags))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metric.Add(float64(value))
|
||||
return err
|
||||
}
|
||||
|
||||
// Gauge is a register with key/value tags:
|
||||
// New values simply override any previous one (eg "current connections")
|
||||
func (r *Reporter) Gauge(name string, value float64, tags metrics.Tags) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = ErrPrometheusPanic
|
||||
}
|
||||
}()
|
||||
|
||||
gauge := r.metrics.getGauge(r.stripUnsupportedCharacters(name), r.listTagKeys(tags))
|
||||
metric, err := gauge.GetMetricWith(r.convertTags(tags))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metric.Set(value)
|
||||
return err
|
||||
}
|
||||
|
||||
// Timing is a histogram with key/valye tags:
|
||||
// New values are added into a series of aggregations
|
||||
func (r *Reporter) Timing(name string, value time.Duration, tags metrics.Tags) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = ErrPrometheusPanic
|
||||
}
|
||||
}()
|
||||
|
||||
timing := r.metrics.getTiming(r.stripUnsupportedCharacters(name), r.listTagKeys(tags))
|
||||
metric, err := timing.GetMetricWith(r.convertTags(tags))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metric.Observe(value.Seconds())
|
||||
return err
|
||||
}
|
69
metrics/prometheus/reporter.go
Normal file
69
metrics/prometheus/reporter.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
log "github.com/micro/go-micro/v3/logger"
|
||||
"github.com/micro/go-micro/v3/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
// Reporter is an implementation of metrics.Reporter:
|
||||
type Reporter struct {
|
||||
options metrics.Options
|
||||
prometheusRegistry *prometheus.Registry
|
||||
metrics metricFamily
|
||||
}
|
||||
|
||||
// New returns a configured prometheus reporter:
|
||||
func New(opts ...metrics.Option) (*Reporter, error) {
|
||||
options := metrics.NewOptions(opts...)
|
||||
|
||||
// Make a prometheus registry (this keeps track of any metrics we generate):
|
||||
prometheusRegistry := prometheus.NewRegistry()
|
||||
prometheusRegistry.Register(prometheus.NewGoCollector())
|
||||
prometheusRegistry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{Namespace: "goruntime"}))
|
||||
|
||||
// Make a new Reporter:
|
||||
newReporter := &Reporter{
|
||||
options: options,
|
||||
prometheusRegistry: prometheusRegistry,
|
||||
}
|
||||
|
||||
// Add metrics families for each type:
|
||||
newReporter.metrics = newReporter.newMetricFamily()
|
||||
|
||||
// Handle the metrics endpoint with prometheus:
|
||||
log.Infof("Metrics/Prometheus [http] Listening on %s%s", options.Address, options.Path)
|
||||
http.Handle(options.Path, promhttp.HandlerFor(prometheusRegistry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}))
|
||||
go http.ListenAndServe(options.Address, nil)
|
||||
|
||||
return newReporter, nil
|
||||
}
|
||||
|
||||
// convertTags turns Tags into prometheus labels:
|
||||
func (r *Reporter) convertTags(tags metrics.Tags) prometheus.Labels {
|
||||
labels := prometheus.Labels{}
|
||||
for key, value := range tags {
|
||||
labels[key] = r.stripUnsupportedCharacters(value)
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
// listTagKeys returns a list of tag keys (we need to provide this to the Prometheus client):
|
||||
func (r *Reporter) listTagKeys(tags metrics.Tags) (labelKeys []string) {
|
||||
for key := range tags {
|
||||
labelKeys = append(labelKeys, key)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// stripUnsupportedCharacters cleans up a metrics key or value:
|
||||
func (r *Reporter) stripUnsupportedCharacters(metricName string) string {
|
||||
valueWithoutDots := strings.Replace(metricName, ".", "_", -1)
|
||||
valueWithoutCommas := strings.Replace(valueWithoutDots, ",", "_", -1)
|
||||
valueWIthoutSpaces := strings.Replace(valueWithoutCommas, " ", "", -1)
|
||||
return valueWIthoutSpaces
|
||||
}
|
73
metrics/prometheus/reporter_test.go
Normal file
73
metrics/prometheus/reporter_test.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/v3/metrics"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestPrometheusReporter(t *testing.T) {
|
||||
|
||||
// Make a Reporter:
|
||||
reporter, err := New(metrics.Path("/prometheus"), metrics.DefaultTags(map[string]string{"service": "prometheus-test"}))
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, reporter)
|
||||
assert.Equal(t, "prometheus-test", reporter.options.DefaultTags["service"])
|
||||
assert.Equal(t, ":9000", reporter.options.Address)
|
||||
assert.Equal(t, "/prometheus", reporter.options.Path)
|
||||
|
||||
// Check that our implementation is valid:
|
||||
assert.Implements(t, new(metrics.Reporter), reporter)
|
||||
|
||||
// Test tag conversion:
|
||||
tags := metrics.Tags{
|
||||
"tag1": "false",
|
||||
"tag2": "true",
|
||||
}
|
||||
convertedTags := reporter.convertTags(tags)
|
||||
assert.Equal(t, "false", convertedTags["tag1"])
|
||||
assert.Equal(t, "true", convertedTags["tag2"])
|
||||
|
||||
// Test tag enumeration:
|
||||
listedTags := reporter.listTagKeys(tags)
|
||||
assert.Contains(t, listedTags, "tag1")
|
||||
assert.Contains(t, listedTags, "tag2")
|
||||
|
||||
// Test string cleaning:
|
||||
preparedMetricName := reporter.stripUnsupportedCharacters("some.kind,of tag")
|
||||
assert.Equal(t, "some_kind_oftag", preparedMetricName)
|
||||
|
||||
// Test MetricFamilies:
|
||||
metricFamily := reporter.newMetricFamily()
|
||||
|
||||
// Counters:
|
||||
assert.NotNil(t, metricFamily.getCounter("testCounter", []string{"test", "counter"}))
|
||||
assert.Len(t, metricFamily.counters, 1)
|
||||
|
||||
// Gauges:
|
||||
assert.NotNil(t, metricFamily.getGauge("testGauge", []string{"test", "gauge"}))
|
||||
assert.Len(t, metricFamily.gauges, 1)
|
||||
|
||||
// Timings:
|
||||
assert.NotNil(t, metricFamily.getTiming("testTiming", []string{"test", "timing"}))
|
||||
assert.Len(t, metricFamily.timings, 1)
|
||||
|
||||
// Test submitting metrics through the interface methods:
|
||||
assert.NoError(t, reporter.Count("test.counter.1", 6, tags))
|
||||
assert.NoError(t, reporter.Count("test.counter.2", 19, tags))
|
||||
assert.NoError(t, reporter.Count("test.counter.1", 5, tags))
|
||||
assert.NoError(t, reporter.Gauge("test.gauge.1", 99, tags))
|
||||
assert.NoError(t, reporter.Gauge("test.gauge.2", 55, tags))
|
||||
assert.NoError(t, reporter.Gauge("test.gauge.1", 98, tags))
|
||||
assert.NoError(t, reporter.Timing("test.timing.1", time.Second, tags))
|
||||
assert.NoError(t, reporter.Timing("test.timing.2", time.Minute, tags))
|
||||
assert.Len(t, reporter.metrics.counters, 2)
|
||||
assert.Len(t, reporter.metrics.gauges, 2)
|
||||
assert.Len(t, reporter.metrics.timings, 2)
|
||||
|
||||
// Test reading back the metrics:
|
||||
// This could be done by hitting the /metrics endpoint
|
||||
}
|
Reference in New Issue
Block a user