meter/wrapper: add inflight request/message count #47
@ -11,18 +11,25 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
ClientRequestDurationSeconds = "client_request_duration_seconds"
|
||||||
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
ClientRequestLatencyMicroseconds = "client_request_latency_microseconds"
|
||||||
ClientRequestTotal = "client_request_total"
|
ClientRequestTotal = "client_request_total"
|
||||||
ServerRequestDurationSeconds = "server_request_duration_seconds"
|
ClientRequestInflight = "client_request_inflight"
|
||||||
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
|
|
||||||
ServerRequestTotal = "server_request_total"
|
ServerRequestDurationSeconds = "server_request_duration_seconds"
|
||||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
ServerRequestLatencyMicroseconds = "server_request_latency_microseconds"
|
||||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
ServerRequestTotal = "server_request_total"
|
||||||
PublishMessageTotal = "publish_message_total"
|
ServerRequestInflight = "server_request_inflight"
|
||||||
|
|
||||||
|
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||||
|
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||||
|
PublishMessageTotal = "publish_message_total"
|
||||||
|
PublishMessageInflight = "publish_message_inflight"
|
||||||
|
|
||||||
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
||||||
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
||||||
SubscribeMessageTotal = "subscribe_message_total"
|
SubscribeMessageTotal = "subscribe_message_total"
|
||||||
|
SubscribeMessageInflight = "subscribe_message_inflight"
|
||||||
|
|
||||||
labelSuccess = "success"
|
labelSuccess = "success"
|
||||||
labelFailure = "failure"
|
labelFailure = "failure"
|
||||||
@ -116,13 +123,16 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
|
|||||||
return w.callFunc(ctx, addr, req, rsp, opts)
|
return w.callFunc(ctx, addr, req, rsp, opts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ts := time.Now()
|
|
||||||
err := w.callFunc(ctx, addr, req, rsp, opts)
|
|
||||||
te := time.Since(ts)
|
|
||||||
|
|
||||||
labels := make([]string, 0, 4)
|
labels := make([]string, 0, 4)
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||||
|
ts := time.Now()
|
||||||
|
err := w.callFunc(ctx, addr, req, rsp, opts)
|
||||||
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||||
|
|
||||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||||
|
|
||||||
@ -144,12 +154,14 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
labels := make([]string, 0, 4)
|
||||||
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
err := w.Client.Call(ctx, req, rsp, opts...)
|
err := w.Client.Call(ctx, req, rsp, opts...)
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||||
labels := make([]string, 0, 4)
|
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
|
||||||
|
|
||||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||||
@ -172,12 +184,14 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
labels := make([]string, 0, 4)
|
||||||
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
stream, err := w.Client.Stream(ctx, req, opts...)
|
stream, err := w.Client.Stream(ctx, req, opts...)
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec()
|
||||||
labels := make([]string, 0, 4)
|
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
|
||||||
|
|
||||||
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||||
@ -195,12 +209,14 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
|||||||
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
endpoint := p.Topic()
|
endpoint := p.Topic()
|
||||||
|
|
||||||
|
labels := make([]string, 0, 4)
|
||||||
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
err := w.Client.Publish(ctx, p, opts...)
|
err := w.Client.Publish(ctx, p, opts...)
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
|
||||||
labels := make([]string, 0, 4)
|
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
|
||||||
|
|
||||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||||
@ -231,12 +247,14 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
labels := make([]string, 0, 4)
|
||||||
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
err := fn(ctx, req, rsp)
|
err := fn(ctx, req, rsp)
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec()
|
||||||
labels := make([]string, 0, 4)
|
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
|
||||||
|
|
||||||
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||||
@ -263,12 +281,14 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
|
|||||||
return func(ctx context.Context, msg server.Message) error {
|
return func(ctx context.Context, msg server.Message) error {
|
||||||
endpoint := msg.Topic()
|
endpoint := msg.Topic()
|
||||||
|
|
||||||
|
labels := make([]string, 0, 4)
|
||||||
|
labels = append(labels, labelEndpoint, endpoint)
|
||||||
|
|
||||||
|
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
err := fn(ctx, msg)
|
err := fn(ctx, msg)
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
|
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
|
||||||
labels := make([]string, 0, 4)
|
|
||||||
labels = append(labels, labelEndpoint, endpoint)
|
|
||||||
|
|
||||||
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||||
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
|
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||||
|
Loading…
Reference in New Issue
Block a user