commit 8ab7bf693dd609bc2d860496711eaec5cce26434 Author: Vasiliy Tolstov Date: Wed Dec 4 01:49:07 2019 +0300 add victoria metrics monitoring wrapper Signed-off-by: Vasiliy Tolstov diff --git a/README.md b/README.md new file mode 100644 index 0000000..fd0abbe --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +# VictoriaMetrics + +Wrappers are a form of middleware that can be used with go-micro services. They can wrap both the Client and Server handlers. +This plugin implements the HandlerWrapper interface to provide automatic prometheus metric handling +for each microservice method execution time and operation count for success and failed cases. + +This handler will export two metrics to prometheus: +* **micro_request_total**. How many go-miro requests processed, partitioned by method and status. +* **micro_request_duration_microseconds**. Service method request latencies in microseconds, partitioned by method. + +# Usage + +When creating your service, add the wrapper like so. + +```go + service := micro.NewService( + micro.Name("service name"), + micro.Version("latest"), + micro.WrapHandler(victoriametrics.NewHandlerWrapper()), + ) + + service.Init() +``` diff --git a/victoriametrics.go b/victoriametrics.go new file mode 100644 index 0000000..2edf4d7 --- /dev/null +++ b/victoriametrics.go @@ -0,0 +1,82 @@ +package victoriametrics + +import ( + "context" + "fmt" + "strings" + "time" + + metrics "github.com/VictoriaMetrics/metrics" + "github.com/micro/go-micro/server" +) + +var ( + defaultMetricPrefix = "micro" + metaLabels []string +) + +func getName(name string, md map[string]interface{}) string { + labels := make([]string, 0, len(metaLabels)+len(md)) + labels = append(labels, metaLabels...) + + for k, v := range md { + labels = append(labels, fmt.Sprintf(`%s="%v"`, k, v)) + } + + if len(labels) > 0 { + return fmt.Sprintf(`%s_%s{%s}`, defaultMetricPrefix, name, strings.Join(labels, ",")) + } + return fmt.Sprintf(`%s_%s`, defaultMetricPrefix, name) +} + +func NewHandlerWrapper(opts ...server.Option) server.HandlerWrapper { + sopts := server.Options{} + + for _, opt := range opts { + opt(&sopts) + } + + metadata := make(map[string]string, len(sopts.Metadata)) + for k, v := range sopts.Metadata { + metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, k)] = v + } + if len(sopts.Name) > 0 { + metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "name")] = sopts.Name + } + if len(sopts.Id) > 0 { + metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "id")] = sopts.Id + } + if len(sopts.Version) > 0 { + metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "version")] = sopts.Version + } + metaLabels = make([]string, 0, len(metadata)) + for k, v := range metadata { + metaLabels = append(metaLabels, fmt.Sprintf(`%s="%v"`, k, v)) + } + + return func(fn server.HandlerFunc) server.HandlerFunc { + return func(ctx context.Context, req server.Request, rsp interface{}) error { + name := req.Endpoint() + timeCounterSummary := metrics.GetOrCreateSummary( + getName("upstream_latency_seconds", map[string]interface{}{"method": name}), + ) + timeCounterHistogram := metrics.GetOrCreateSummary( + getName("request_duration_seconds", map[string]interface{}{"method": name}), + ) + + ts := time.Now() + err := fn(ctx, req, rsp) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("request_total", map[string]interface{}{"method": name, "status": "success"})).Inc() + } else { + metrics.GetOrCreateCounter(getName("request_total", map[string]interface{}{"method": name, "status": "failure"})).Inc() + } + + return err + } + } +} diff --git a/victoriametrics_test.go b/victoriametrics_test.go new file mode 100644 index 0000000..c7b4d0d --- /dev/null +++ b/victoriametrics_test.go @@ -0,0 +1,155 @@ +package victoriametrics + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "strings" + "testing" + + metrics "github.com/VictoriaMetrics/metrics" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/client/selector" + "github.com/micro/go-micro/registry/memory" + "github.com/micro/go-micro/server" + "github.com/stretchr/testify/assert" +) + +type Test interface { + Method(ctx context.Context, in *TestRequest, opts ...client.CallOption) (*TestResponse, error) +} + +type TestRequest struct { + IsError bool +} +type TestResponse struct{} + +type testHandler struct{} + +func (t *testHandler) Method(ctx context.Context, req *TestRequest, rsp *TestResponse) error { + if req.IsError { + return fmt.Errorf("test error") + } + return nil +} + +func TestVictoriametrics(t *testing.T) { + // setup + registry := memory.NewRegistry() + sel := selector.NewSelector(selector.Registry(registry)) + + name := "test" + id := "id-1234567890" + version := "1.2.3.4" + + md := make(map[string]string) + md["dc"] = "dc1" + md["node"] = "node1" + + c := client.NewClient(client.Selector(sel)) + s := server.NewServer( + server.Name(name), + server.Version(version), + server.Id(id), + server.Registry(registry), + server.WrapHandler( + NewHandlerWrapper( + server.Metadata(md), + server.Name(name), + server.Version(version), + server.Id(id), + ), + ), + ) + + defer s.Stop() + + type Test struct { + *testHandler + } + + s.Handle( + s.NewHandler(&Test{new(testHandler)}), + ) + + if err := s.Start(); err != nil { + t.Fatalf("Unexpected error starting server: %v", err) + } + + req := c.NewRequest(name, "Test.Method", &TestRequest{IsError: false}, client.WithContentType("application/json")) + rsp := TestResponse{} + + assert.NoError(t, c.Call(context.TODO(), req, &rsp)) + + req = c.NewRequest(name, "Test.Method", &TestRequest{IsError: true}, client.WithContentType("application/json")) + assert.Error(t, c.Call(context.TODO(), req, &rsp)) + + buf := bytes.NewBuffer(nil) + metrics.WritePrometheus(buf, false) + + metric, err := findMetricByName(buf, "sum", "micro_request_total") + if err != nil { + t.Fatal(err) + } + + labels := metric[0]["labels"].(map[string]string) + for k, v := range labels { + switch k { + case "micro_dc": + assert.Equal(t, "dc1", v) + case "micro_node": + assert.Equal(t, "node1", v) + case "micro_version": + assert.Equal(t, version, v) + case "micro_id": + assert.Equal(t, id, v) + case "micro_name": + assert.Equal(t, name, v) + case "method": + assert.Equal(t, "Test.Method", v) + case "quantile", "status": + continue + default: + t.Fatalf("unknown %v with %v", k, v) + } + } +} + +func findMetricByName(buf io.Reader, tp string, name string) ([]map[string]interface{}, error) { + var metrics []map[string]interface{} + scanner := bufio.NewScanner(buf) + for scanner.Scan() { + txt := scanner.Text() + if strings.HasPrefix(txt, name) { + mt := make(map[string]interface{}) + v := txt[strings.LastIndex(txt, " "):] + k := "" + if idx := strings.Index(txt, "{"); idx > 0 { + labels := make(map[string]string) + lb := strings.Split(txt[idx+1:strings.Index(txt, "}")], ",") + for _, l := range lb { + p := strings.Split(l, "=") + labels[strings.Trim(p[0], `"`)] = strings.Trim(p[1], `"`) + } + mt["labels"] = labels + k = txt[:idx] + } else { + k = txt[:strings.Index(txt, " ")] + } + mt["name"] = k + mt["value"] = v + metrics = append(metrics, mt) + } + } + + if err := scanner.Err(); err != nil { + return nil, err + } + + if len(metrics) == 0 { + return nil, fmt.Errorf("%s %s not found", tp, name) + } + return metrics, nil +}