add victoria metrics monitoring wrapper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
commit
8ab7bf693d
23
README.md
Normal file
23
README.md
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# VictoriaMetrics
|
||||||
|
|
||||||
|
Wrappers are a form of middleware that can be used with go-micro services. They can wrap both the Client and Server handlers.
|
||||||
|
This plugin implements the HandlerWrapper interface to provide automatic prometheus metric handling
|
||||||
|
for each microservice method execution time and operation count for success and failed cases.
|
||||||
|
|
||||||
|
This handler will export two metrics to prometheus:
|
||||||
|
* **micro_request_total**. How many go-miro requests processed, partitioned by method and status.
|
||||||
|
* **micro_request_duration_microseconds**. Service method request latencies in microseconds, partitioned by method.
|
||||||
|
|
||||||
|
# Usage
|
||||||
|
|
||||||
|
When creating your service, add the wrapper like so.
|
||||||
|
|
||||||
|
```go
|
||||||
|
service := micro.NewService(
|
||||||
|
micro.Name("service name"),
|
||||||
|
micro.Version("latest"),
|
||||||
|
micro.WrapHandler(victoriametrics.NewHandlerWrapper()),
|
||||||
|
)
|
||||||
|
|
||||||
|
service.Init()
|
||||||
|
```
|
82
victoriametrics.go
Normal file
82
victoriametrics.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
package victoriametrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
metrics "github.com/VictoriaMetrics/metrics"
|
||||||
|
"github.com/micro/go-micro/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultMetricPrefix = "micro"
|
||||||
|
metaLabels []string
|
||||||
|
)
|
||||||
|
|
||||||
|
func getName(name string, md map[string]interface{}) string {
|
||||||
|
labels := make([]string, 0, len(metaLabels)+len(md))
|
||||||
|
labels = append(labels, metaLabels...)
|
||||||
|
|
||||||
|
for k, v := range md {
|
||||||
|
labels = append(labels, fmt.Sprintf(`%s="%v"`, k, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(labels) > 0 {
|
||||||
|
return fmt.Sprintf(`%s_%s{%s}`, defaultMetricPrefix, name, strings.Join(labels, ","))
|
||||||
|
}
|
||||||
|
return fmt.Sprintf(`%s_%s`, defaultMetricPrefix, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHandlerWrapper(opts ...server.Option) server.HandlerWrapper {
|
||||||
|
sopts := server.Options{}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(&sopts)
|
||||||
|
}
|
||||||
|
|
||||||
|
metadata := make(map[string]string, len(sopts.Metadata))
|
||||||
|
for k, v := range sopts.Metadata {
|
||||||
|
metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, k)] = v
|
||||||
|
}
|
||||||
|
if len(sopts.Name) > 0 {
|
||||||
|
metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "name")] = sopts.Name
|
||||||
|
}
|
||||||
|
if len(sopts.Id) > 0 {
|
||||||
|
metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "id")] = sopts.Id
|
||||||
|
}
|
||||||
|
if len(sopts.Version) > 0 {
|
||||||
|
metadata[fmt.Sprintf("%s_%s", defaultMetricPrefix, "version")] = sopts.Version
|
||||||
|
}
|
||||||
|
metaLabels = make([]string, 0, len(metadata))
|
||||||
|
for k, v := range metadata {
|
||||||
|
metaLabels = append(metaLabels, fmt.Sprintf(`%s="%v"`, k, v))
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(fn server.HandlerFunc) server.HandlerFunc {
|
||||||
|
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||||
|
name := req.Endpoint()
|
||||||
|
timeCounterSummary := metrics.GetOrCreateSummary(
|
||||||
|
getName("upstream_latency_seconds", map[string]interface{}{"method": name}),
|
||||||
|
)
|
||||||
|
timeCounterHistogram := metrics.GetOrCreateSummary(
|
||||||
|
getName("request_duration_seconds", map[string]interface{}{"method": name}),
|
||||||
|
)
|
||||||
|
|
||||||
|
ts := time.Now()
|
||||||
|
err := fn(ctx, req, rsp)
|
||||||
|
te := time.Since(ts)
|
||||||
|
|
||||||
|
timeCounterSummary.Update(float64(te.Seconds()))
|
||||||
|
timeCounterHistogram.Update(te.Seconds())
|
||||||
|
if err == nil {
|
||||||
|
metrics.GetOrCreateCounter(getName("request_total", map[string]interface{}{"method": name, "status": "success"})).Inc()
|
||||||
|
} else {
|
||||||
|
metrics.GetOrCreateCounter(getName("request_total", map[string]interface{}{"method": name, "status": "failure"})).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
155
victoriametrics_test.go
Normal file
155
victoriametrics_test.go
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
package victoriametrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
metrics "github.com/VictoriaMetrics/metrics"
|
||||||
|
"github.com/micro/go-micro/client"
|
||||||
|
"github.com/micro/go-micro/client/selector"
|
||||||
|
"github.com/micro/go-micro/registry/memory"
|
||||||
|
"github.com/micro/go-micro/server"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Test interface {
|
||||||
|
Method(ctx context.Context, in *TestRequest, opts ...client.CallOption) (*TestResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestRequest struct {
|
||||||
|
IsError bool
|
||||||
|
}
|
||||||
|
type TestResponse struct{}
|
||||||
|
|
||||||
|
type testHandler struct{}
|
||||||
|
|
||||||
|
func (t *testHandler) Method(ctx context.Context, req *TestRequest, rsp *TestResponse) error {
|
||||||
|
if req.IsError {
|
||||||
|
return fmt.Errorf("test error")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVictoriametrics(t *testing.T) {
|
||||||
|
// setup
|
||||||
|
registry := memory.NewRegistry()
|
||||||
|
sel := selector.NewSelector(selector.Registry(registry))
|
||||||
|
|
||||||
|
name := "test"
|
||||||
|
id := "id-1234567890"
|
||||||
|
version := "1.2.3.4"
|
||||||
|
|
||||||
|
md := make(map[string]string)
|
||||||
|
md["dc"] = "dc1"
|
||||||
|
md["node"] = "node1"
|
||||||
|
|
||||||
|
c := client.NewClient(client.Selector(sel))
|
||||||
|
s := server.NewServer(
|
||||||
|
server.Name(name),
|
||||||
|
server.Version(version),
|
||||||
|
server.Id(id),
|
||||||
|
server.Registry(registry),
|
||||||
|
server.WrapHandler(
|
||||||
|
NewHandlerWrapper(
|
||||||
|
server.Metadata(md),
|
||||||
|
server.Name(name),
|
||||||
|
server.Version(version),
|
||||||
|
server.Id(id),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
type Test struct {
|
||||||
|
*testHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Handle(
|
||||||
|
s.NewHandler(&Test{new(testHandler)}),
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := s.Start(); err != nil {
|
||||||
|
t.Fatalf("Unexpected error starting server: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := c.NewRequest(name, "Test.Method", &TestRequest{IsError: false}, client.WithContentType("application/json"))
|
||||||
|
rsp := TestResponse{}
|
||||||
|
|
||||||
|
assert.NoError(t, c.Call(context.TODO(), req, &rsp))
|
||||||
|
|
||||||
|
req = c.NewRequest(name, "Test.Method", &TestRequest{IsError: true}, client.WithContentType("application/json"))
|
||||||
|
assert.Error(t, c.Call(context.TODO(), req, &rsp))
|
||||||
|
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
metrics.WritePrometheus(buf, false)
|
||||||
|
|
||||||
|
metric, err := findMetricByName(buf, "sum", "micro_request_total")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
labels := metric[0]["labels"].(map[string]string)
|
||||||
|
for k, v := range labels {
|
||||||
|
switch k {
|
||||||
|
case "micro_dc":
|
||||||
|
assert.Equal(t, "dc1", v)
|
||||||
|
case "micro_node":
|
||||||
|
assert.Equal(t, "node1", v)
|
||||||
|
case "micro_version":
|
||||||
|
assert.Equal(t, version, v)
|
||||||
|
case "micro_id":
|
||||||
|
assert.Equal(t, id, v)
|
||||||
|
case "micro_name":
|
||||||
|
assert.Equal(t, name, v)
|
||||||
|
case "method":
|
||||||
|
assert.Equal(t, "Test.Method", v)
|
||||||
|
case "quantile", "status":
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
t.Fatalf("unknown %v with %v", k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func findMetricByName(buf io.Reader, tp string, name string) ([]map[string]interface{}, error) {
|
||||||
|
var metrics []map[string]interface{}
|
||||||
|
scanner := bufio.NewScanner(buf)
|
||||||
|
for scanner.Scan() {
|
||||||
|
txt := scanner.Text()
|
||||||
|
if strings.HasPrefix(txt, name) {
|
||||||
|
mt := make(map[string]interface{})
|
||||||
|
v := txt[strings.LastIndex(txt, " "):]
|
||||||
|
k := ""
|
||||||
|
if idx := strings.Index(txt, "{"); idx > 0 {
|
||||||
|
labels := make(map[string]string)
|
||||||
|
lb := strings.Split(txt[idx+1:strings.Index(txt, "}")], ",")
|
||||||
|
for _, l := range lb {
|
||||||
|
p := strings.Split(l, "=")
|
||||||
|
labels[strings.Trim(p[0], `"`)] = strings.Trim(p[1], `"`)
|
||||||
|
}
|
||||||
|
mt["labels"] = labels
|
||||||
|
k = txt[:idx]
|
||||||
|
} else {
|
||||||
|
k = txt[:strings.Index(txt, " ")]
|
||||||
|
}
|
||||||
|
mt["name"] = k
|
||||||
|
mt["value"] = v
|
||||||
|
metrics = append(metrics, mt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := scanner.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(metrics) == 0 {
|
||||||
|
return nil, fmt.Errorf("%s %s not found", tp, name)
|
||||||
|
}
|
||||||
|
return metrics, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user