meter: move metrics handling in broker implementations
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
@@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
@@ -134,13 +135,22 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
||||
eh := m.opts.ErrorHandler
|
||||
|
||||
for t, ms := range msgTopicMap {
|
||||
ts := time.Now()
|
||||
|
||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms))
|
||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms))
|
||||
|
||||
m.RLock()
|
||||
subs, ok := m.subscribers[t]
|
||||
m.RUnlock()
|
||||
if !ok {
|
||||
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms))
|
||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
continue
|
||||
}
|
||||
|
||||
m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms))
|
||||
for _, sub := range subs {
|
||||
if sub.opts.BatchErrorHandler != nil {
|
||||
beh = sub.opts.BatchErrorHandler
|
||||
@@ -152,37 +162,65 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub
|
||||
switch {
|
||||
// batch processing
|
||||
case sub.batchhandler != nil:
|
||||
|
||||
if err = sub.batchhandler(ms); err != nil {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
ms.SetError(err)
|
||||
if beh != nil {
|
||||
_ = beh(ms)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = ms.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
} else {
|
||||
if sub.opts.AutoAck {
|
||||
if err = ms.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
} else {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
}
|
||||
} else {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
}
|
||||
}
|
||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms))
|
||||
// single processing
|
||||
case sub.handler != nil:
|
||||
for _, p := range ms {
|
||||
if err = sub.handler(p); err != nil {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
p.SetError(err)
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
} else if m.opts.Logger.V(logger.ErrorLevel) {
|
||||
m.opts.Logger.Error(m.opts.Context, err.Error())
|
||||
}
|
||||
} else if sub.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
} else {
|
||||
if sub.opts.AutoAck {
|
||||
if err = p.Ack(); err != nil {
|
||||
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc()
|
||||
} else {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
}
|
||||
} else {
|
||||
m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc()
|
||||
}
|
||||
}
|
||||
m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1)
|
||||
m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
te := time.Since(ts)
|
||||
m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds())
|
||||
m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@@ -12,6 +12,53 @@ import (
|
||||
"go.unistack.org/micro/v4/tracer"
|
||||
)
|
||||
|
||||
/*
|
||||
|
||||
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||
endpoint := p.Topic()
|
||||
|
||||
labels := make([]string, 0, 4)
|
||||
labels = append(labels, labelEndpoint, endpoint)
|
||||
|
||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
err := w.Client.Publish(ctx, p, opts...)
|
||||
te := time.Since(ts)
|
||||
w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec()
|
||||
|
||||
w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds())
|
||||
|
||||
if err == nil {
|
||||
labels = append(labels, labelStatus, labelSuccess)
|
||||
} else {
|
||||
labels = append(labels, labelStatus, labelFailure)
|
||||
}
|
||||
w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc()
|
||||
|
||||
return err
|
||||
}
|
||||
*/
|
||||
|
||||
var (
|
||||
// PublishMessageDurationSeconds specifies meter metric name
|
||||
PublishMessageDurationSeconds = "publish_message_duration_seconds"
|
||||
// PublishMessageLatencyMicroseconds specifies meter metric name
|
||||
PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds"
|
||||
// PublishMessageTotal specifies meter metric name
|
||||
PublishMessageTotal = "publish_message_total"
|
||||
// PublishMessageInflight specifies meter metric name
|
||||
PublishMessageInflight = "publish_message_inflight"
|
||||
// SubscribeMessageDurationSeconds specifies meter metric name
|
||||
SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds"
|
||||
// SubscribeMessageLatencyMicroseconds specifies meter metric name
|
||||
SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds"
|
||||
// SubscribeMessageTotal specifies meter metric name
|
||||
SubscribeMessageTotal = "subscribe_message_total"
|
||||
// SubscribeMessageInflight specifies meter metric name
|
||||
SubscribeMessageInflight = "subscribe_message_inflight"
|
||||
)
|
||||
|
||||
// Options struct
|
||||
type Options struct {
|
||||
// Tracer used for tracing
|
||||
|
Reference in New Issue
Block a user