meter/wrapper: add inflight request/message count #47

Merged
vtolstov merged 1 commits from issue-46 into master 2021-05-10 17:59:40 +03:00

View File

@ -11,18 +11,25 @@ import (
) )
var ( var (
ClientRequestDurationSeconds = "client_request_duration_seconds" ClientRequestDurationSeconds = "client_request_duration_seconds"
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
ClientRequestTotal = "client_request_total" ClientRequestTotal = "client_request_total"
ServerRequestDurationSeconds = "server_request_duration_seconds" ClientRequestInflight = "client_request_inflight"
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
ServerRequestTotal = "server_request_total" ServerRequestDurationSeconds = "server_request_duration_seconds"
PublishMessageDurationSeconds = "publish_message_duration_seconds" ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" ServerRequestTotal = "server_request_total"
PublishMessageTotal = "publish_message_total" ServerRequestInflight = "server_request_inflight"
PublishMessageDurationSeconds = "publish_message_duration_seconds"
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
PublishMessageTotal = "publish_message_total"
PublishMessageInflight = "publish_message_inflight"
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds" SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds" SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
SubscribeMessageTotal = "subscribe_message_total" SubscribeMessageTotal = "subscribe_message_total"
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success" labelSuccess = "success"
labelFailure = "failure" labelFailure = "failure"
@ -116,13 +123,16 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
return w.callFunc(ctx, addr, req, rsp, opts) return w.callFunc(ctx, addr, req, rsp, opts)
} }
} }
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
labels := make([]string, 0, 4) labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint) labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
@ -144,12 +154,14 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
} }
} }
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...) err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
@ -172,12 +184,14 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
} }
} }
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...) stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
@ -195,12 +209,14 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic() endpoint := p.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := w.Client.Publish(ctx, p, opts...) err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
@ -231,12 +247,14 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
} }
} }
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := fn(ctx, req, rsp) err := fn(ctx, req, rsp)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
@ -263,12 +281,14 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
return func(ctx context.Context, msg server.Message) error { return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic() endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
err := fn(ctx, msg) err := fn(ctx, msg)
te := time.Since(ts) te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())