From 054bd02b5981f7ff6f4941fe7c96372b460ec01e Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 27 Apr 2023 15:30:55 +0300 Subject: [PATCH 1/2] meter: move metrics handling in broker implementations Signed-off-by: Vasiliy Tolstov --- broker/memory.go | 50 +++++++++++++++++++--- broker/options.go | 47 +++++++++++++++++++++ meter/wrapper/wrapper.go | 91 +--------------------------------------- 3 files changed, 92 insertions(+), 96 deletions(-) diff --git a/broker/memory.go b/broker/memory.go index 015a40fc..08d55d55 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -3,6 +3,7 @@ package broker import ( "context" "sync" + "time" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/metadata" @@ -134,13 +135,22 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub eh := m.opts.ErrorHandler for t, ms := range msgTopicMap { + ts := time.Now() + + m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(len(ms)) + m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(len(ms)) + m.RLock() subs, ok := m.subscribers[t] m.RUnlock() if !ok { + m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "failure").Add(len(ms)) + m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms)) + m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms)) continue } + m.opts.Meter.Counter(PublishMessageTotal, "endpoint", t, "status", "success").Add(len(ms)) for _, sub := range subs { if sub.opts.BatchErrorHandler != nil { beh = sub.opts.BatchErrorHandler @@ -152,37 +162,65 @@ func (m *memoryBroker) publish(ctx context.Context, msgs []*Message, opts ...Pub switch { // batch processing case sub.batchhandler != nil: + if err = sub.batchhandler(ms); err != nil { + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() ms.SetError(err) if beh != nil { _ = beh(ms) } else if m.opts.Logger.V(logger.ErrorLevel) { m.opts.Logger.Error(m.opts.Context, err.Error()) } - } else if sub.opts.AutoAck { - if err = ms.Ack(); err != nil { - m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } else { + if sub.opts.AutoAck { + if err = ms.Ack(); err != nil { + m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() + } else { + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc() + } + } else { + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc() } } + m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-len(ms)) + m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-len(ms)) // single processing case sub.handler != nil: for _, p := range ms { if err = sub.handler(p); err != nil { + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() p.SetError(err) if eh != nil { _ = eh(p) } else if m.opts.Logger.V(logger.ErrorLevel) { m.opts.Logger.Error(m.opts.Context, err.Error()) } - } else if sub.opts.AutoAck { - if err = p.Ack(); err != nil { - m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + } else { + if sub.opts.AutoAck { + if err = p.Ack(); err != nil { + m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "failure").Inc() + } else { + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc() + } + } else { + m.opts.Meter.Counter(SubscribeMessageTotal, "endpoint", t, "status", "success").Inc() } } + m.opts.Meter.Counter(PublishMessageInflight, "endpoint", t).Add(-1) + m.opts.Meter.Counter(SubscribeMessageInflight, "endpoint", t).Add(-1) } } + } + te := time.Since(ts) + m.opts.Meter.Summary(PublishMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds()) + m.opts.Meter.Histogram(PublishMessageDurationSeconds, "endpoint", t).Update(te.Seconds()) + m.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, "endpoint", t).Update(te.Seconds()) + m.opts.Meter.Histogram(SubscribeMessageDurationSeconds, "endpoint", t).Update(te.Seconds()) } + } return nil diff --git a/broker/options.go b/broker/options.go index 6a1d6a5a..989f5fda 100644 --- a/broker/options.go +++ b/broker/options.go @@ -12,6 +12,53 @@ import ( "go.unistack.org/micro/v4/tracer" ) +/* + +func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + endpoint := p.Topic() + + labels := make([]string, 0, 4) + labels = append(labels, labelEndpoint, endpoint) + + w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc() + ts := time.Now() + err := w.Client.Publish(ctx, p, opts...) + te := time.Since(ts) + w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec() + + w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds()) + w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds()) + + if err == nil { + labels = append(labels, labelStatus, labelSuccess) + } else { + labels = append(labels, labelStatus, labelFailure) + } + w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc() + + return err +} +*/ + +var ( + // PublishMessageDurationSeconds specifies meter metric name + PublishMessageDurationSeconds = "publish_message_duration_seconds" + // PublishMessageLatencyMicroseconds specifies meter metric name + PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" + // PublishMessageTotal specifies meter metric name + PublishMessageTotal = "publish_message_total" + // PublishMessageInflight specifies meter metric name + PublishMessageInflight = "publish_message_inflight" + // SubscribeMessageDurationSeconds specifies meter metric name + SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds" + // SubscribeMessageLatencyMicroseconds specifies meter metric name + SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds" + // SubscribeMessageTotal specifies meter metric name + SubscribeMessageTotal = "subscribe_message_total" + // SubscribeMessageInflight specifies meter metric name + SubscribeMessageInflight = "subscribe_message_inflight" +) + // Options struct type Options struct { // Tracer used for tracing diff --git a/meter/wrapper/wrapper.go b/meter/wrapper/wrapper.go index 979dac27..c5a5e6a9 100644 --- a/meter/wrapper/wrapper.go +++ b/meter/wrapper/wrapper.go @@ -27,22 +27,6 @@ var ( ServerRequestTotal = "server_request_total" // ServerRequestInflight specifies meter metric name ServerRequestInflight = "server_request_inflight" - // PublishMessageDurationSeconds specifies meter metric name - PublishMessageDurationSeconds = "publish_message_duration_seconds" - // PublishMessageLatencyMicroseconds specifies meter metric name - PublishMessageLatencyMicroseconds = "publish_message_latency_microseconds" - // PublishMessageTotal specifies meter metric name - PublishMessageTotal = "publish_message_total" - // PublishMessageInflight specifies meter metric name - PublishMessageInflight = "publish_message_inflight" - // SubscribeMessageDurationSeconds specifies meter metric name - SubscribeMessageDurationSeconds = "subscribe_message_duration_seconds" - // SubscribeMessageLatencyMicroseconds specifies meter metric name - SubscribeMessageLatencyMicroseconds = "subscribe_message_latency_microseconds" - // SubscribeMessageTotal specifies meter metric name - SubscribeMessageTotal = "subscribe_message_total" - // SubscribeMessageInflight specifies meter metric name - SubscribeMessageInflight = "subscribe_message_inflight" labelSuccess = "success" labelFailure = "failure" @@ -230,37 +214,7 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client } func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - endpoint := p.Topic() - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc() - ts := time.Now() - err := w.Client.Publish(ctx, p, opts...) - te := time.Since(ts) - w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec() - - w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc() - - return err -} - -// NewHandlerWrapper create new server handler wrapper -// deprecated -func NewHandlerWrapper(opts ...Option) server.HandlerWrapper { - handler := &wrapper{ - opts: NewOptions(opts...), - } - return handler.HandlerFunc + return w.Client.Publish(ctx, p, opts...) } // NewServerHandlerWrapper create new server handler wrapper @@ -302,46 +256,3 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { return err } } - -// NewSubscriberWrapper create server subscribe wrapper -// deprecated -func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper { - handler := &wrapper{ - opts: NewOptions(opts...), - } - return handler.SubscriberFunc -} - -func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { - handler := &wrapper{ - opts: NewOptions(opts...), - } - return handler.SubscriberFunc -} - -func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { - return func(ctx context.Context, msg server.Message) error { - endpoint := msg.Topic() - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc() - ts := time.Now() - err := fn(ctx, msg) - te := time.Since(ts) - w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec() - - w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc() - - return err - } -} From fa0248c80c502c5c7af4c9918cef4c1b36e06394 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 27 Apr 2023 15:31:59 +0300 Subject: [PATCH 2/2] cleanup Signed-off-by: Vasiliy Tolstov --- broker/options.go | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/broker/options.go b/broker/options.go index 989f5fda..b5b8d933 100644 --- a/broker/options.go +++ b/broker/options.go @@ -12,34 +12,6 @@ import ( "go.unistack.org/micro/v4/tracer" ) -/* - -func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { - endpoint := p.Topic() - - labels := make([]string, 0, 4) - labels = append(labels, labelEndpoint, endpoint) - - w.opts.Meter.Counter(PublishMessageInflight, labels...).Inc() - ts := time.Now() - err := w.Client.Publish(ctx, p, opts...) - te := time.Since(ts) - w.opts.Meter.Counter(PublishMessageInflight, labels...).Dec() - - w.opts.Meter.Summary(PublishMessageLatencyMicroseconds, labels...).Update(te.Seconds()) - w.opts.Meter.Histogram(PublishMessageDurationSeconds, labels...).Update(te.Seconds()) - - if err == nil { - labels = append(labels, labelStatus, labelSuccess) - } else { - labels = append(labels, labelStatus, labelFailure) - } - w.opts.Meter.Counter(PublishMessageTotal, labels...).Inc() - - return err -} -*/ - var ( // PublishMessageDurationSeconds specifies meter metric name PublishMessageDurationSeconds = "publish_message_duration_seconds"