meter: initial wrapper import #19

Merged
vtolstov merged 1 commits from meter-wrapper into master 2021-02-18 14:35:10 +03:00
2 changed files with 87 additions and 74 deletions

View File

@ -63,15 +63,6 @@ func Address(value string) Option {
} }
} }
/*
// Labels be added to every metric
func Labels(labels []string) Option {
return func(o *Options) {
o.Labels = labels
}
}
*/
/* /*
// TimingObjectives defines the desired spread of statistics for histogram / timing metrics: // TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
func TimingObjectives(value map[float64]float64) Option { func TimingObjectives(value map[float64]float64) Option {

View File

@ -1,5 +1,3 @@
// +build ignore
package wrapper package wrapper
import ( import (
@ -12,6 +10,26 @@ import (
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
) )
var (
ClientRequestDurationSeconds = "client_request_duration_seconds"
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
ClientRequestTotal = "client_request_total"
ServerRequestDurationSeconds = "server_request_duration_seconds"
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
ServerRequestTotal = "server_request_total"
PublishMessageDurationSeconds = "publish_message_duration_seconds"
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
PublishMessageTotal = "publish_message_total"
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
SubscribeMessageTotal = "subscribe_message_total"
labelSuccess = "success"
labelFailure = "failure"
labelStatus = "status"
labelEndpoint = "endpoint"
)
type Options struct { type Options struct {
Meter meter.Meter Meter meter.Meter
Name string Name string
@ -21,6 +39,14 @@ type Options struct {
type Option func(*Options) type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{}
for _, o := range opts {
o(&options)
}
return options
}
func ServiceName(name string) Option { func ServiceName(name string) Option {
return func(o *Options) { return func(o *Options) {
o.Name = name o.Name = name
@ -46,7 +72,7 @@ func Meter(m meter.Meter) Option {
} }
type wrapper struct { type wrapper struct {
options Options opts Options
callFunc client.CallFunc callFunc client.CallFunc
client.Client client.Client
} }
@ -54,178 +80,174 @@ type wrapper struct {
func NewClientWrapper(opts ...Option) client.Wrapper { func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client { return func(c client.Client) client.Client {
handler := &wrapper{ handler := &wrapper{
labels: labels, opts: NewOptions(opts...),
Client: c, Client: c,
} }
return handler return handler
} }
} }
func NewCallWrapper(opts ...Option) client.CallWrapper { func NewCallWrapper(opts ...Option) client.CallWrapper {
labels := getLabels(opts...)
return func(fn client.CallFunc) client.CallFunc { return func(fn client.CallFunc) client.CallFunc {
handler := &wrapper{ handler := &wrapper{
labels: labels, opts: NewOptions(opts...),
callFunc: fn, callFunc: fn,
} }
return handler.CallFunc return handler.CallFunc
} }
} }
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now() ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts) err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts) te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds())) lopts := make([]meter.Option, 0, 2)
timeCounterHistogram.Update(te.Seconds()) lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else { } else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelFailure))
} }
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
return err return err
} }
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now() ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...) err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts) te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds())) lopts := make([]meter.Option, 0, 2)
timeCounterHistogram.Update(te.Seconds()) lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else { } else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelFailure))
} }
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
return err return err
} }
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now() ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...) stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts) te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds())) lopts := make([]meter.Option, 0, 2)
timeCounterHistogram.Update(te.Seconds()) lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else { } else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelFailure))
} }
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
return stream, err return stream, err
} }
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic() endpoint := p.Topic()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("publish_message_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("publish_message_duration_seconds", wlabels))
ts := time.Now() ts := time.Now()
err := w.Client.Publish(ctx, p, opts...) err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts) te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds())) lopts := make([]meter.Option, 0, 2)
timeCounterHistogram.Update(te.Seconds()) lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else { } else {
metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelFailure))
} }
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc()
return err return err
} }
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper { func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
labels := getLabels(opts...)
handler := &wrapper{ handler := &wrapper{
labels: labels, opts: NewOptions(opts...),
} }
return handler.HandlerFunc return handler.HandlerFunc
} }
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error { return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint() endpoint := req.Endpoint()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("server_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("server_request_duration_seconds", wlabels))
ts := time.Now() ts := time.Now()
err := fn(ctx, req, rsp) err := fn(ctx, req, rsp)
te := time.Since(ts) te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds())) lopts := make([]meter.Option, 0, 2)
timeCounterHistogram.Update(te.Seconds()) lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else { } else {
metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelFailure))
} }
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc()
return err return err
} }
} }
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper { func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
labels := getLabels(opts...)
handler := &wrapper{ handler := &wrapper{
labels: labels, opts: NewOptions(opts...),
} }
return handler.SubscriberFunc return handler.SubscriberFunc
} }
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error { return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic() endpoint := msg.Topic()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("subscribe_message_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("subscribe_message_duration_seconds", wlabels))
ts := time.Now() ts := time.Now()
err := fn(ctx, msg) err := fn(ctx, msg)
te := time.Since(ts) te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds())) lopts := make([]meter.Option, 0, 2)
timeCounterHistogram.Update(te.Seconds()) lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil { if err == nil {
metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else { } else {
metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() lopts = append(lopts, meter.Label(labelStatus, labelFailure))
} }
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc()
return err return err
} }