meter: initial wrapper import #19

Merged
vtolstov merged 1 commits from meter-wrapper into master 2021-02-18 14:35:10 +03:00
2 changed files with 87 additions and 74 deletions

View File

@ -63,15 +63,6 @@ func Address(value string) Option {
}
}
/*
// Labels be added to every metric
func Labels(labels []string) Option {
return func(o *Options) {
o.Labels = labels
}
}
*/
/*
// TimingObjectives defines the desired spread of statistics for histogram / timing metrics:
func TimingObjectives(value map[float64]float64) Option {

View File

@ -1,5 +1,3 @@
// +build ignore
package wrapper
import (
@ -12,6 +10,26 @@ import (
"github.com/unistack-org/micro/v3/server"
)
var (
ClientRequestDurationSeconds = "client_request_duration_seconds"
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
ClientRequestTotal = "client_request_total"
ServerRequestDurationSeconds = "server_request_duration_seconds"
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
ServerRequestTotal = "server_request_total"
PublishMessageDurationSeconds = "publish_message_duration_seconds"
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
PublishMessageTotal = "publish_message_total"
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
SubscribeMessageTotal = "subscribe_message_total"
labelSuccess = "success"
labelFailure = "failure"
labelStatus = "status"
labelEndpoint = "endpoint"
)
type Options struct {
Meter meter.Meter
Name string
@ -21,6 +39,14 @@ type Options struct {
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{}
for _, o := range opts {
o(&options)
}
return options
}
func ServiceName(name string) Option {
return func(o *Options) {
o.Name = name
@ -46,7 +72,7 @@ func Meter(m meter.Meter) Option {
}
type wrapper struct {
options Options
opts Options
callFunc client.CallFunc
client.Client
}
@ -54,178 +80,174 @@ type wrapper struct {
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
handler := &wrapper{
labels: labels,
opts: NewOptions(opts...),
Client: c,
}
return handler
}
}
func NewCallWrapper(opts ...Option) client.CallWrapper {
labels := getLabels(opts...)
return func(fn client.CallFunc) client.CallFunc {
handler := &wrapper{
labels: labels,
opts: NewOptions(opts...),
callFunc: fn,
}
return handler.CallFunc
}
}
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
lopts := make([]meter.Option, 0, 2)
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
return err
}
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
lopts := make([]meter.Option, 0, 2)
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
return err
}
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
lopts := make([]meter.Option, 0, 2)
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ClientRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ClientRequestTotal, lopts...).Inc()
return stream, err
}
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("publish_message_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("publish_message_duration_seconds", wlabels))
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
lopts := make([]meter.Option, 0, 2)
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(PublishMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else {
metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
}
w.opts.Meter.Counter(PublishMessageTotal, lopts...).Inc()
return err
}
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
labels := getLabels(opts...)
handler := &wrapper{
labels: labels,
opts: NewOptions(opts...),
}
return handler.HandlerFunc
}
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("server_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("server_request_duration_seconds", wlabels))
ts := time.Now()
err := fn(ctx, req, rsp)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
lopts := make([]meter.Option, 0, 2)
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(ServerRequestDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else {
metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
}
w.opts.Meter.Counter(ServerRequestTotal, lopts...).Inc()
return err
}
}
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
labels := getLabels(opts...)
handler := &wrapper{
labels: labels,
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("subscribe_message_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("subscribe_message_duration_seconds", wlabels))
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
lopts := make([]meter.Option, 0, 2)
lopts = append(lopts, meter.Label(labelEndpoint, endpoint))
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, lopts...).Update(float64(te.Seconds()))
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, lopts...).Update(float64(te.Seconds()))
if err == nil {
metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelSuccess))
} else {
metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
lopts = append(lopts, meter.Label(labelStatus, labelFailure))
}
w.opts.Meter.Counter(SubscribeMessageTotal, lopts...).Inc()
return err
}