From 1154db66d03778fa53ed6e5b4841ded9a63f6027 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 20 Jun 2020 17:09:22 +0300 Subject: [PATCH] wrapper/monitoring/victoriametrics: align with prometheus metrics Signed-off-by: Vasiliy Tolstov --- victoriametrics.go | 283 ++++++++++++++++++++++++++++++++-------- victoriametrics_test.go | 19 +-- 2 files changed, 234 insertions(+), 68 deletions(-) diff --git a/victoriametrics.go b/victoriametrics.go index e89d08a..25546d3 100644 --- a/victoriametrics.go +++ b/victoriametrics.go @@ -7,76 +7,251 @@ import ( "time" metrics "github.com/VictoriaMetrics/metrics" + "github.com/micro/go-micro/v2/client" + "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/server" ) var ( - defaultMetricPrefix = "micro" - metaLabels []string + // default metric prefix + DefaultMetricPrefix = "micro_" + // default label prefix + DefaultLabelPrefix = "micro_" ) -func getName(name string, md map[string]interface{}) string { - labels := make([]string, 0, len(metaLabels)+len(md)) - labels = append(labels, metaLabels...) - - for k, v := range md { - labels = append(labels, fmt.Sprintf(`%s="%v"`, k, v)) - } - - if len(labels) > 0 { - return fmt.Sprintf(`%s_%s{%s}`, defaultMetricPrefix, name, strings.Join(labels, ",")) - } - return fmt.Sprintf(`%s_%s`, defaultMetricPrefix, name) +type Options struct { + Name string + Version string + ID string } -func NewHandlerWrapper(opts ...server.Option) server.HandlerWrapper { - sopts := server.Options{} +type Option func(*Options) + +func ServiceName(name string) Option { + return func(opts *Options) { + opts.Name = name + } +} + +func ServiceVersion(version string) Option { + return func(opts *Options) { + opts.Version = version + } +} + +func ServiceID(id string) Option { + return func(opts *Options) { + opts.ID = id + } +} + +func getName(name string, labels []string) string { + if len(labels) > 0 { + return fmt.Sprintf(`%s%s{%s}`, DefaultMetricPrefix, name, strings.Join(labels, ",")) + } + return fmt.Sprintf(`%s%s`, DefaultMetricPrefix, name) +} + +func getLabels(opts ...Option) []string { + options := Options{} for _, opt := range opts { - opt(&sopts) + opt(&options) } - metadata := make(map[string]string, len(sopts.Metadata)) - for k, v := range sopts.Metadata { - metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, k)] = v - } - if len(sopts.Name) > 0 { - metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "name")] = sopts.Name - } - if len(sopts.Id) > 0 { - metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "id")] = sopts.Id - } - if len(sopts.Version) > 0 { - metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "version")] = sopts.Version - } - metaLabels = make([]string, 0, len(metadata)) - for k, v := range metadata { - metaLabels = append(metaLabels, fmt.Sprintf(`%s="%v"`, k, v)) - } + labels := make([]string, 0, 3) + labels = append(labels, fmt.Sprintf(`%sname="%s"`, DefaultLabelPrefix, options.Name)) + labels = append(labels, fmt.Sprintf(`%sversion="%s"`, DefaultLabelPrefix, options.Version)) + labels = append(labels, fmt.Sprintf(`%sid="%s"`, DefaultLabelPrefix, options.ID)) - return func(fn server.HandlerFunc) server.HandlerFunc { - return func(ctx context.Context, req server.Request, rsp interface{}) error { - name := req.Endpoint() - timeCounterSummary := metrics.GetOrCreateSummary( - getName("upstream_latency_seconds", map[string]interface{}{"method": name}), - ) - timeCounterHistogram := metrics.GetOrCreateSummary( - getName("request_duration_seconds", map[string]interface{}{"method": name}), - ) + return labels +} - ts := time.Now() - err := fn(ctx, req, rsp) - te := time.Since(ts) +type wrapper struct { + options Options + callFunc client.CallFunc + client.Client + labels []string +} - timeCounterSummary.Update(float64(te.Seconds())) - timeCounterHistogram.Update(te.Seconds()) - if err == nil { - metrics.GetOrCreateCounter(getName("request_total", map[string]interface{}{"method": name, "status": "success"})).Inc() - } else { - metrics.GetOrCreateCounter(getName("request_total", map[string]interface{}{"method": name, "status": "failure"})).Inc() - } +func NewClientWrapper(opts ...Option) client.Wrapper { + labels := getLabels(opts...) - return err + return func(c client.Client) client.Client { + handler := &wrapper{ + labels: labels, + Client: c, } + + return handler + } +} + +func NewCallWrapper(opts ...Option) client.CallWrapper { + labels := getLabels(opts...) + + return func(fn client.CallFunc) client.CallFunc { + handler := &wrapper{ + labels: labels, + callFunc: fn, + } + + return handler.CallFunc + } +} + +func (w *wrapper) CallFunc(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("upstream_latency_seconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("request_duration_seconds", wlabels)) + + ts := time.Now() + err := w.callFunc(ctx, node, req, rsp, opts) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err +} + +func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("upstream_latency_seconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("request_duration_seconds", wlabels)) + + ts := time.Now() + err := w.Client.Call(ctx, req, rsp, opts...) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err +} + +func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("upstream_latency_seconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("request_duration_seconds", wlabels)) + + ts := time.Now() + stream, err := w.Client.Stream(ctx, req, opts...) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return stream, err +} + +func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + endpoint := p.Topic() + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("upstream_latency_seconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("request_duration_seconds", wlabels)) + + ts := time.Now() + err := w.Client.Publish(ctx, p, opts...) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err +} + +func NewHandlerWrapper(opts ...Option) server.HandlerWrapper { + labels := getLabels(opts...) + + handler := &wrapper{ + labels: labels, + } + + return handler.HandlerFunc +} + +func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { + return func(ctx context.Context, req server.Request, rsp interface{}) error { + endpoint := req.Endpoint() + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("upstream_latency_seconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("request_duration_seconds", wlabels)) + + ts := time.Now() + err := fn(ctx, req, rsp) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err + } +} + +func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper { + labels := getLabels(opts...) + + handler := &wrapper{ + labels: labels, + } + + return handler.SubscriberFunc +} + +func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { + return func(ctx context.Context, msg server.Message) error { + endpoint := msg.Topic() + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("upstream_latency_seconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("request_duration_seconds", wlabels)) + + ts := time.Now() + err := fn(ctx, msg) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err } } diff --git a/victoriametrics_test.go b/victoriametrics_test.go index 0ed46c8..26eb733 100644 --- a/victoriametrics_test.go +++ b/victoriametrics_test.go @@ -44,10 +44,6 @@ func TestVictoriametrics(t *testing.T) { id := "id-1234567890" version := "1.2.3.4" - md := make(map[string]string) - md["dc"] = "dc1" - md["node"] = "node1" - c := client.NewClient(client.Selector(sel)) s := server.NewServer( server.Name(name), @@ -56,10 +52,9 @@ func TestVictoriametrics(t *testing.T) { server.Registry(registry), server.WrapHandler( NewHandlerWrapper( - server.Metadata(md), - server.Name(name), - server.Version(version), - server.Id(id), + ServiceName(name), + ServiceVersion(version), + ServiceID(id), ), ), ) @@ -97,19 +92,15 @@ func TestVictoriametrics(t *testing.T) { labels := metric[0]["labels"].(map[string]string) for k, v := range labels { switch k { - case "micro_dc": - assert.Equal(t, "dc1", v) - case "micro_node": - assert.Equal(t, "node1", v) case "micro_version": assert.Equal(t, version, v) case "micro_id": assert.Equal(t, id, v) case "micro_name": assert.Equal(t, name, v) - case "method": + case "micro_endpoint": assert.Equal(t, "Test.Method", v) - case "quantile", "status": + case "micro_status": continue default: t.Fatalf("unknown %v with %v", k, v)