diff --git a/meter/wrapper/wrapper.go b/meter/wrapper/wrapper.go index 529d1e25..d52ca94a 100644 --- a/meter/wrapper/wrapper.go +++ b/meter/wrapper/wrapper.go @@ -11,18 +11,25 @@ import ( ) var ( - ClientRequestDurationSeconds = "client_request_duration_seconds" - ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" - ClientRequestTotal = "client_request_total" - ServerRequestDurationSeconds = "server_request_duration_seconds" - ServerRequestLatencyMicroseconds = "server_request_latency_microseconds" - ServerRequestTotal = "server_request_total" - PublishMessageDurationSeconds = "publish_message_duration_seconds" - PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" - PublishMessageTotal = "publish_message_total" + ClientRequestDurationSeconds = "client_request_duration_seconds" + ClientRequestLatencyMicroseconds = "client_request_latency_microseconds" + ClientRequestTotal = "client_request_total" + ClientRequestInflight = "client_request_inflight" + + ServerRequestDurationSeconds = "server_request_duration_seconds" + ServerRequestLatencyMicroseconds = "server_request_latency_microseconds" + ServerRequestTotal = "server_request_total" + ServerRequestInflight = "server_request_inflight" + + PublishMessageDurationSeconds = "publish_message_duration_seconds" + PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" + PublishMessageTotal = "publish_message_total" + PublishMessageInflight = "publish_message_inflight" + SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds" SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds" SubscribeMessageTotal = "subscribe_message_total" + SubscribeMessageInflight = "subscribe_message_inflight" labelSuccess = "success" labelFailure = "failure" @@ -116,13 +123,16 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, return w.callFunc(ctx, addr, req, rsp, opts) } } - ts := time.Now() - err := w.callFunc(ctx, addr, req, rsp, opts) - te := time.Since(ts) labels := make([]string, 0, 4) labels = append(labels, labelEndpoint, endpoint) + w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc() + ts := time.Now() + err := w.callFunc(ctx, addr, req, rsp, opts) + te := time.Since(ts) + w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec() + w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) @@ -144,12 +154,14 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, } } + labels := make([]string, 0, 4) + labels = append(labels, labelEndpoint, endpoint) + + w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc() ts := time.Now() err := w.Client.Call(ctx, req, rsp, opts...) te := time.Since(ts) - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) + w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec() w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) @@ -172,12 +184,14 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client } } + labels := make([]string, 0, 4) + labels = append(labels, labelEndpoint, endpoint) + + w.opts.Meter.Counter(ClientRequestInflight, labels...).Inc() ts := time.Now() stream, err := w.Client.Stream(ctx, req, opts...) te := time.Since(ts) - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) + w.opts.Meter.Counter(ClientRequestInflight, labels...).Dec() w.opts.Meter.Summary(ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ClientRequestDurationSeconds, labels...).Update(te.Seconds()) @@ -195,12 +209,14 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { endpoint := p.Topic() + labels := make([]string, 0, 4) + labels = append(labels, labelEndpoint, endpoint) + + w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc() ts := time.Now() err := w.Client.Publish(ctx, p, opts...) te := time.Since(ts) - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) + w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec() w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds()) @@ -231,12 +247,14 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { } } + labels := make([]string, 0, 4) + labels = append(labels, labelEndpoint, endpoint) + + w.opts.Meter.Counter(ServerRequestInflight, labels...).Inc() ts := time.Now() err := fn(ctx, req, rsp) te := time.Since(ts) - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) + w.opts.Meter.Counter(ServerRequestInflight, labels...).Dec() w.opts.Meter.Summary(ServerRequestLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(ServerRequestDurationSeconds, labels...).Update(te.Seconds()) @@ -263,12 +281,14 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc return func(ctx context.Context, msg server.Message) error { endpoint := msg.Topic() + labels := make([]string, 0, 4) + labels = append(labels, labelEndpoint, endpoint) + + w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc() ts := time.Now() err := fn(ctx, msg) te := time.Since(ts) - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) + w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec() w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())