Merge pull request 'meter: move metrics handling in broker implementations' (#215) from metrics into master

Reviewed-on: #215
This commit is contained in:
Василий Толстов 2023-04-27 15:32:56 +03:00
commit 0e587d923e
3 changed files with 64 additions and 96 deletions

View File

@ -3,6 +3,7 @@ package broker
import ( import (
"context" "context"
"sync" "sync"
"time"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/metadata"
@ -134,13 +135,22 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
eh := m.opts.ErrorHandler eh := m.opts.ErrorHandler
for t, ms := range msgTopicMap { for t, ms := range msgTopicMap {
ts := time.Now()
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms))
m.RLock() m.RLock()
subs, ok := m.subscribers[t] subs, ok := m.subscribers[t]
m.RUnlock() m.RUnlock()
if !ok { if !ok {
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
continue continue
} }
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
for _, sub := range subs { for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil { if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler beh = sub.opts.BatchErrorHandler
@ -152,37 +162,65 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
switch { switch {
// batch processing // batch processing
case sub.batchhandler != nil: case sub.batchhandler != nil:
if err = sub.batchhandler(ms); err != nil { if err = sub.batchhandler(ms); err != nil {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
ms.SetError(err) ms.SetError(err)
if beh != nil { if beh != nil {
_ = beh(ms) _ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) { } else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error()) m.opts.Logger.Error(m.opts.Context, err.Error())
} }
} else if sub.opts.AutoAck { } else {
if err = ms.Ack(); err != nil { if sub.opts.AutoAck {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) if err = ms.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
} }
} }
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
// single processing // single processing
case sub.handler != nil: case sub.handler != nil:
for _, p := range ms { for _, p := range ms {
if err = sub.handler(p); err != nil { if err = sub.handler(p); err != nil {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
p.SetError(err) p.SetError(err)
if eh != nil { if eh != nil {
_ = eh(p) _ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) { } else if m.opts.Logger.V(logger.ErrorLevel) {
m.opts.Logger.Error(m.opts.Context, err.Error()) m.opts.Logger.Error(m.opts.Context, err.Error())
} }
} else if sub.opts.AutoAck { } else {
if err = p.Ack(); err != nil { if sub.opts.AutoAck {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) if err = p.Ack(); err != nil {
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
}
} else {
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
} }
} }
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
} }
} }
} }
te := time.Since(ts)
m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
} }
} }
return nil return nil

View File

@ -12,6 +12,25 @@ import (
"go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/tracer"
) )
var (
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
)
// Options struct // Options struct
type Options struct { type Options struct {
// Tracer used for tracing // Tracer used for tracing

View File

@ -27,22 +27,6 @@ var (
ServerRequestTotal = "server_request_total" ServerRequestTotal = "server_request_total"
// ServerRequestInflight specifies meter metric name // ServerRequestInflight specifies meter metric name
ServerRequestInflight = "server_request_inflight" ServerRequestInflight = "server_request_inflight"
// PublishMessageDurationSeconds specifies meter metric name
PublishMessageDurationSeconds = "publish_message_duration_seconds"
// PublishMessageLatencyMicroseconds specifies meter metric name
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
// PublishMessageTotal specifies meter metric name
PublishMessageTotal = "publish_message_total"
// PublishMessageInflight specifies meter metric name
PublishMessageInflight = "publish_message_inflight"
// SubscribeMessageDurationSeconds specifies meter metric name
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
// SubscribeMessageLatencyMicroseconds specifies meter metric name
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
// SubscribeMessageTotal specifies meter metric name
SubscribeMessageTotal = "subscribe_message_total"
// SubscribeMessageInflight specifies meter metric name
SubscribeMessageInflight = "subscribe_message_inflight"
labelSuccess = "success" labelSuccess = "success"
labelFailure = "failure" labelFailure = "failure"
@ -230,37 +214,7 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
} }
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic() return w.Client.Publish(ctx, p, opts...)
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
return err
}
// NewHandlerWrapper create new server handler wrapper
// deprecated
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.HandlerFunc
} }
// NewServerHandlerWrapper create new server handler wrapper // NewServerHandlerWrapper create new server handler wrapper
@ -302,46 +256,3 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return err return err
} }
} }
// NewSubscriberWrapper create server subscribe wrapper
// deprecated
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
handler := &wrapper{
opts: NewOptions(opts...),
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
labels := make([]string, 0, 4)
labels = append(labels, labelEndpoint, endpoint)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc()
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec()
w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds())
w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds())
if err == nil {
labels = append(labels, labelStatus, labelSuccess)
} else {
labels = append(labels, labelStatus, labelFailure)
}
w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc()
return err
}
}