From f2c41beb07c66125580755ec44bbcf4d51904d67 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 10 Jun 2020 16:04:12 +0300 Subject: [PATCH] wrapper/monitoring/prometheus: fix multiservice wrapper usage Signed-off-by: Vasiliy Tolstov --- prometheus.go | 372 +++++++++++++++++++-------------------------- prometheus_test.go | 47 ++---- 2 files changed, 168 insertions(+), 251 deletions(-) diff --git a/prometheus.go b/prometheus.go index e357968..096753f 100644 --- a/prometheus.go +++ b/prometheus.go @@ -3,6 +3,7 @@ package prometheus import ( "context" "fmt" + "sync" "github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/logger" @@ -13,133 +14,140 @@ import ( var ( // default metric prefix - DefaultMetricPrefix = "micro" + DefaultMetricPrefix = "micro_" // default label prefix - DefaultLabelPrefix = "micro" -) + DefaultLabelPrefix = "micro_" -type wrapper struct { opsCounter *prometheus.CounterVec timeCounterSummary *prometheus.SummaryVec timeCounterHistogram *prometheus.HistogramVec - callFunc client.CallFunc + + mu sync.Mutex +) + +type Options struct { + Name string + Version string + ID string +} + +type Option func(*Options) + +func ServiceName(name string) Option { + return func(opts *Options) { + opts.Name = name + } +} + +func ServiceVersion(version string) Option { + return func(opts *Options) { + opts.Version = version + } +} + +func ServiceID(id string) Option { + return func(opts *Options) { + opts.ID = id + } +} + +func registerMetrics() { + mu.Lock() + defer mu.Unlock() + + if opsCounter == nil { + opsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: fmt.Sprintf("%srequest_total", DefaultMetricPrefix), + Help: "Requests processed, partitioned by endpoint and status", + }, + []string{ + fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"), + }, + ) + } + + if timeCounterSummary == nil { + timeCounterSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: fmt.Sprintf("%slatency_microseconds", DefaultMetricPrefix), + Help: "Request latencies in microseconds, partitioned by endpoint", + }, + []string{ + fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"), + }, + ) + } + + if timeCounterHistogram == nil { + timeCounterHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: fmt.Sprintf("%srequest_duration_seconds", DefaultMetricPrefix), + Help: "Request time in seconds, partitioned by endpoint", + }, + []string{ + fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"), + fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"), + }, + ) + } + + for _, collector := range []prometheus.Collector{opsCounter, timeCounterSummary, timeCounterHistogram} { + if err := prometheus.DefaultRegisterer.Register(collector); err != nil { + // if already registered, skip fatal + if _, ok := err.(prometheus.AlreadyRegisteredError); !ok { + logger.Fatal(err) + } + } + } + +} + +type wrapper struct { + options Options + callFunc client.CallFunc client.Client } -func getLabels(opts ...server.Option) prometheus.Labels { - sopts := server.Options{} +func NewClientWrapper(opts ...Option) client.Wrapper { + registerMetrics() + options := Options{} for _, opt := range opts { - opt(&sopts) + opt(&options) } - labels := make(prometheus.Labels, len(sopts.Metadata)) - for k, v := range sopts.Metadata { - labels[fmt.Sprintf("%s_%s", DefaultLabelPrefix, k)] = v - } - if len(sopts.Name) > 0 { - labels[fmt.Sprintf("%s_%s", DefaultLabelPrefix, "name")] = sopts.Name - } - if len(sopts.Id) > 0 { - labels[fmt.Sprintf("%s_%s", DefaultLabelPrefix, "id")] = sopts.Id - } - if len(sopts.Version) > 0 { - labels[fmt.Sprintf("%s_%s", DefaultLabelPrefix, "version")] = sopts.Version - } - - return labels -} - -func NewClientWrapper() client.Wrapper { return func(c client.Client) client.Client { - opsCounter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: fmt.Sprintf("%s_client_request_total", DefaultMetricPrefix), - Help: "How many requests called, partitioned by method and status", - }, - []string{"method", "status"}, - ) - - timeCounterSummary := prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Name: fmt.Sprintf("%s_client_latency_microseconds", DefaultMetricPrefix), - Help: "Service client request latencies in microseconds", - }, - []string{"method"}, - ) - - timeCounterHistogram := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: fmt.Sprintf("%s_client_request_duration_seconds", DefaultMetricPrefix), - Help: "Service client request time in seconds", - }, - []string{"method"}, - ) - - for _, collector := range []prometheus.Collector{opsCounter, timeCounterSummary, timeCounterHistogram} { - if err := prometheus.DefaultRegisterer.Register(collector); err != nil { - // if already registered, skip fatal - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - collector = are.ExistingCollector - } else { - logger.Fatal(err) - } - } - } - handler := &wrapper{ - timeCounterHistogram: timeCounterHistogram, - timeCounterSummary: timeCounterSummary, - opsCounter: opsCounter, - Client: c, + options: options, + Client: c, } return handler } } -func NewCallWrapper() client.CallWrapper { +func NewCallWrapper(opts ...Option) client.CallWrapper { + registerMetrics() + + options := Options{} + for _, opt := range opts { + opt(&options) + } + return func(fn client.CallFunc) client.CallFunc { - opsCounter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: fmt.Sprintf("%s_call_request_total", DefaultMetricPrefix), - Help: "How many requests called, partitioned by method and status", - }, - []string{"method", "status"}, - ) - - timeCounterSummary := prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Name: fmt.Sprintf("%s_call_latency_microseconds", DefaultMetricPrefix), - Help: "Service client request latencies in microseconds", - }, - []string{"method"}, - ) - - timeCounterHistogram := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: fmt.Sprintf("%s_call_request_duration_seconds", DefaultMetricPrefix), - Help: "Service client request time in seconds", - }, - []string{"method"}, - ) - - for _, collector := range []prometheus.Collector{opsCounter, timeCounterSummary, timeCounterHistogram} { - if err := prometheus.DefaultRegisterer.Register(collector); err != nil { - // if already registered, skip fatal - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - collector = are.ExistingCollector - } else { - logger.Fatal(err) - } - } - } - handler := &wrapper{ - timeCounterHistogram: timeCounterHistogram, - timeCounterSummary: timeCounterSummary, - opsCounter: opsCounter, - callFunc: fn, + options: options, + callFunc: fn, } return handler.CallFunc @@ -147,20 +155,20 @@ func NewCallWrapper() client.CallWrapper { } func (w *wrapper) CallFunc(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { - name := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { us := v * 1000000 // make microseconds - w.timeCounterSummary.WithLabelValues(name).Observe(us) - w.timeCounterHistogram.WithLabelValues(name).Observe(v) + timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us) + timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v) })) defer timer.ObserveDuration() err := w.callFunc(ctx, node, req, rsp, opts) if err == nil { - w.opsCounter.WithLabelValues(name, "success").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc() } else { - w.opsCounter.WithLabelValues(name, "fail").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc() } return err @@ -168,110 +176,75 @@ func (w *wrapper) CallFunc(ctx context.Context, node *registry.Node, req client. } func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { - name := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { us := v * 1000000 // make microseconds - w.timeCounterSummary.WithLabelValues(name).Observe(us) - w.timeCounterHistogram.WithLabelValues(name).Observe(v) + timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us) + timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v) })) defer timer.ObserveDuration() err := w.Client.Call(ctx, req, rsp, opts...) if err == nil { - w.opsCounter.WithLabelValues(name, "success").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc() } else { - w.opsCounter.WithLabelValues(name, "fail").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc() } return err } func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { - name := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { us := v * 1000000 // make microseconds - w.timeCounterSummary.WithLabelValues(name).Observe(us) - w.timeCounterHistogram.WithLabelValues(name).Observe(v) + timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us) + timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v) })) defer timer.ObserveDuration() stream, err := w.Client.Stream(ctx, req, opts...) if err == nil { - w.opsCounter.WithLabelValues(name, "success").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc() } else { - w.opsCounter.WithLabelValues(name, "fail").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc() } return stream, err } func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - name := p.Topic() + endpoint := p.Topic() timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { us := v * 1000000 // make microseconds - w.timeCounterSummary.WithLabelValues(name).Observe(us) - w.timeCounterHistogram.WithLabelValues(name).Observe(v) + timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us) + timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v) })) defer timer.ObserveDuration() err := w.Client.Publish(ctx, p, opts...) if err == nil { - w.opsCounter.WithLabelValues(name, "success").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc() } else { - w.opsCounter.WithLabelValues(name, "fail").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc() } return err } -func NewHandlerWrapper(opts ...server.Option) server.HandlerWrapper { - labels := getLabels(opts...) +func NewHandlerWrapper(opts ...Option) server.HandlerWrapper { + registerMetrics() - opsCounter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - ConstLabels: labels, - Name: fmt.Sprintf("%s_handler_request_total", DefaultMetricPrefix), - Help: "How many requests processed, partitioned by method and status", - }, - []string{"method", "status"}, - ) - - timeCounterSummary := prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - ConstLabels: labels, - Name: fmt.Sprintf("%s_handler_latency_microseconds", DefaultMetricPrefix), - Help: "Service handler request latencies in microseconds", - }, - []string{"method"}, - ) - - timeCounterHistogram := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - ConstLabels: labels, - Name: fmt.Sprintf("%s_handler_request_duration_seconds", DefaultMetricPrefix), - Help: "Service handler request time in seconds", - }, - []string{"method"}, - ) - - for _, collector := range []prometheus.Collector{opsCounter, timeCounterSummary, timeCounterHistogram} { - if err := prometheus.DefaultRegisterer.Register(collector); err != nil { - // if already registered, skip fatal - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - collector = are.ExistingCollector - } else { - logger.Fatal(err) - } - } + options := Options{} + for _, opt := range opts { + opt(&options) } handler := &wrapper{ - timeCounterHistogram: timeCounterHistogram, - timeCounterSummary: timeCounterSummary, - opsCounter: opsCounter, + options: options, } return handler.HandlerFunc @@ -279,71 +252,36 @@ func NewHandlerWrapper(opts ...server.Option) server.HandlerWrapper { func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { return func(ctx context.Context, req server.Request, rsp interface{}) error { - name := req.Endpoint() + endpoint := req.Endpoint() timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { us := v * 1000000 // make microseconds - w.timeCounterSummary.WithLabelValues(name).Observe(us) - w.timeCounterHistogram.WithLabelValues(name).Observe(v) + timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us) + timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v) })) defer timer.ObserveDuration() err := fn(ctx, req, rsp) if err == nil { - w.opsCounter.WithLabelValues(name, "success").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc() } else { - w.opsCounter.WithLabelValues(name, "fail").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc() } return err } } -func NewSubscriberWrapper(opts ...server.Option) server.SubscriberWrapper { - labels := getLabels(opts...) +func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper { + registerMetrics() - opsCounter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - ConstLabels: labels, - Name: fmt.Sprintf("%s_subscriber_request_total", DefaultMetricPrefix), - Help: "How many requests processed, partitioned by topic and status", - }, - []string{"topic", "status"}, - ) - - timeCounterSummary := prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - ConstLabels: labels, - Name: fmt.Sprintf("%s_subscriber_latency_microseconds", DefaultMetricPrefix), - Help: "Service subscriber request latencies in microseconds", - }, - []string{"topic"}, - ) - - timeCounterHistogram := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - ConstLabels: labels, - Name: fmt.Sprintf("%s_subscriber_request_duration_seconds", DefaultMetricPrefix), - Help: "Service subscriber request time in seconds", - }, - []string{"topic"}, - ) - - for _, collector := range []prometheus.Collector{opsCounter, timeCounterSummary, timeCounterHistogram} { - if err := prometheus.DefaultRegisterer.Register(collector); err != nil { - // if already registered, skip fatal - if are, ok := err.(prometheus.AlreadyRegisteredError); ok { - collector = are.ExistingCollector - } else { - logger.Fatal(err) - } - } + options := Options{} + for _, opt := range opts { + opt(&options) } handler := &wrapper{ - timeCounterHistogram: timeCounterHistogram, - timeCounterSummary: timeCounterSummary, - opsCounter: opsCounter, + options: options, } return handler.SubscriberFunc @@ -351,20 +289,20 @@ func NewSubscriberWrapper(opts ...server.Option) server.SubscriberWrapper { func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { return func(ctx context.Context, msg server.Message) error { - topic := msg.Topic() + endpoint := msg.Topic() timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { us := v * 1000000 // make microseconds - w.timeCounterSummary.WithLabelValues(topic).Observe(us) - w.timeCounterHistogram.WithLabelValues(topic).Observe(v) + timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us) + timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v) })) defer timer.ObserveDuration() err := fn(ctx, msg) if err == nil { - w.opsCounter.WithLabelValues(topic, "success").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc() } else { - w.opsCounter.WithLabelValues(topic, "fail").Inc() + opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc() } return err diff --git a/prometheus_test.go b/prometheus_test.go index cd7ed75..da9301e 100644 --- a/prometheus_test.go +++ b/prometheus_test.go @@ -45,10 +45,6 @@ func TestPrometheusMetrics(t *testing.T) { id := "id-1234567890" version := "1.2.3.4" - md := make(map[string]string) - md["dc"] = "dc1" - md["node"] = "node1" - c := client.NewClient(client.Selector(sel)) s := server.NewServer( server.Name(name), @@ -58,10 +54,9 @@ func TestPrometheusMetrics(t *testing.T) { server.Broker(brk), server.WrapHandler( promwrapper.NewHandlerWrapper( - server.Metadata(md), - server.Name(name), - server.Version(version), - server.Id(id), + promwrapper.ServiceName(name), + promwrapper.ServiceVersion(version), + promwrapper.ServiceID(id), ), ), ) @@ -90,7 +85,7 @@ func TestPrometheusMetrics(t *testing.T) { list, _ := prometheus.DefaultGatherer.Gather() - metric := findMetricByName(list, dto.MetricType_SUMMARY, "micro_handler_latency_microseconds") + metric := findMetricByName(list, dto.MetricType_SUMMARY, "micro_latency_microseconds") if metric == nil || metric.Metric == nil || len(metric.Metric) == 0 { t.Fatalf("no metrics returned") @@ -98,17 +93,13 @@ func TestPrometheusMetrics(t *testing.T) { for _, v := range metric.Metric[0].Label { switch *v.Name { - case "micro_dc": - assert.Equal(t, "dc1", *v.Value) - case "micro_node": - assert.Equal(t, "node1", *v.Value) case "micro_version": assert.Equal(t, version, *v.Value) case "micro_id": assert.Equal(t, id, *v.Value) case "micro_name": assert.Equal(t, name, *v.Value) - case "method": + case "micro_endpoint": assert.Equal(t, "Test.Method", *v.Value) default: t.Fatalf("unknown %v with %v", *v.Name, *v.Value) @@ -118,21 +109,17 @@ func TestPrometheusMetrics(t *testing.T) { assert.Equal(t, uint64(2), *metric.Metric[0].Summary.SampleCount) assert.True(t, *metric.Metric[0].Summary.SampleSum > 0) - metric = findMetricByName(list, dto.MetricType_HISTOGRAM, "micro_handler_request_duration_seconds") + metric = findMetricByName(list, dto.MetricType_HISTOGRAM, "micro_request_duration_seconds") for _, v := range metric.Metric[0].Label { switch *v.Name { - case "micro_dc": - assert.Equal(t, "dc1", *v.Value) - case "micro_node": - assert.Equal(t, "node1", *v.Value) case "micro_version": assert.Equal(t, version, *v.Value) case "micro_id": assert.Equal(t, id, *v.Value) case "micro_name": assert.Equal(t, name, *v.Value) - case "method": + case "micro_endpoint": assert.Equal(t, "Test.Method", *v.Value) default: t.Fatalf("unknown %v with %v", *v.Name, *v.Value) @@ -142,43 +129,35 @@ func TestPrometheusMetrics(t *testing.T) { assert.Equal(t, uint64(2), *metric.Metric[0].Histogram.SampleCount) assert.True(t, *metric.Metric[0].Histogram.SampleSum > 0) - metric = findMetricByName(list, dto.MetricType_COUNTER, "micro_handler_request_total") + metric = findMetricByName(list, dto.MetricType_COUNTER, "micro_request_total") for _, v := range metric.Metric[0].Label { switch *v.Name { - case "micro_dc": - assert.Equal(t, "dc1", *v.Value) - case "micro_node": - assert.Equal(t, "node1", *v.Value) case "micro_version": assert.Equal(t, version, *v.Value) case "micro_id": assert.Equal(t, id, *v.Value) case "micro_name": assert.Equal(t, name, *v.Value) - case "method": + case "micro_endpoint": assert.Equal(t, "Test.Method", *v.Value) - case "status": - assert.Equal(t, "fail", *v.Value) + case "micro_status": + assert.Equal(t, "failure", *v.Value) } } assert.Equal(t, *metric.Metric[0].Counter.Value, float64(1)) for _, v := range metric.Metric[1].Label { switch *v.Name { - case "dc": - assert.Equal(t, "dc1", *v.Value) - case "node": - assert.Equal(t, "node1", *v.Value) case "micro_version": assert.Equal(t, version, *v.Value) case "micro_id": assert.Equal(t, id, *v.Value) case "micro_name": assert.Equal(t, name, *v.Value) - case "method": + case "micro_endpoint": assert.Equal(t, "Test.Method", *v.Value) - case "status": + case "micro_status": assert.Equal(t, "success", *v.Value) } }