337
prometheus.go
337
prometheus.go
@@ -5,11 +5,10 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/micro/go-micro/v2/client"
|
||||
"github.com/micro/go-micro/v2/logger"
|
||||
"github.com/micro/go-micro/v2/registry"
|
||||
"github.com/micro/go-micro/v2/server"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -18,9 +17,21 @@ var (
|
||||
// default label prefix
|
||||
DefaultLabelPrefix = "micro_"
|
||||
|
||||
opsCounter *prometheus.CounterVec
|
||||
timeCounterSummary *prometheus.SummaryVec
|
||||
timeCounterHistogram *prometheus.HistogramVec
|
||||
clientOpsCounter *prometheus.CounterVec
|
||||
clientTimeCounterSummary *prometheus.SummaryVec
|
||||
clientTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
serverOpsCounter *prometheus.CounterVec
|
||||
serverTimeCounterSummary *prometheus.SummaryVec
|
||||
serverTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
publishOpsCounter *prometheus.CounterVec
|
||||
publishTimeCounterSummary *prometheus.SummaryVec
|
||||
publishTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
subscribeOpsCounter *prometheus.CounterVec
|
||||
subscribeTimeCounterSummary *prometheus.SummaryVec
|
||||
subscribeTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
mu sync.Mutex
|
||||
)
|
||||
@@ -29,34 +40,224 @@ type Options struct {
|
||||
Name string
|
||||
Version string
|
||||
ID string
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func Context(ctx context.Context) Option {
|
||||
return func(o *Options) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceName(name string) Option {
|
||||
return func(opts *Options) {
|
||||
opts.Name = name
|
||||
return func(o *Options) {
|
||||
o.Name = name
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceVersion(version string) Option {
|
||||
return func(opts *Options) {
|
||||
opts.Version = version
|
||||
return func(o *Options) {
|
||||
o.Version = version
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceID(id string) Option {
|
||||
return func(opts *Options) {
|
||||
opts.ID = id
|
||||
return func(o *Options) {
|
||||
o.ID = id
|
||||
}
|
||||
}
|
||||
|
||||
func registerMetrics() {
|
||||
func registerServerMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if opsCounter == nil {
|
||||
opsCounter = prometheus.NewCounterVec(
|
||||
if serverOpsCounter == nil {
|
||||
serverOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%sserver_request_total", DefaultMetricPrefix),
|
||||
Help: "Requests processed, partitioned by endpoint and status",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if serverTimeCounterSummary == nil {
|
||||
serverTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%sserver_latency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Request latencies in microseconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if serverTimeCounterHistogram == nil {
|
||||
serverTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%sserver_request_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Request time in seconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{serverOpsCounter, serverTimeCounterSummary, serverTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func registerPublishMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if publishOpsCounter == nil {
|
||||
publishOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%spublish_message_total", DefaultMetricPrefix),
|
||||
Help: "Messages sent, partitioned by endpoint and status",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if publishTimeCounterSummary == nil {
|
||||
publishTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%spublish_message_latency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Message latencies in microseconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if publishTimeCounterHistogram == nil {
|
||||
publishTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%spublish_message_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Message publish time in seconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{publishOpsCounter, publishTimeCounterSummary, publishTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func registerSubscribeMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if subscribeOpsCounter == nil {
|
||||
subscribeOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%ssubscribe_message_total", DefaultMetricPrefix),
|
||||
Help: "Messages processed, partitioned by endpoint and status",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if subscribeTimeCounterSummary == nil {
|
||||
subscribeTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%ssubscribe_message_latency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Message processing latencies in microseconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if subscribeTimeCounterHistogram == nil {
|
||||
subscribeTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%ssubscribe_message_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Request time in seconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{subscribeOpsCounter, subscribeTimeCounterSummary, subscribeTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func registerClientMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if clientOpsCounter == nil {
|
||||
clientOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%srequest_total", DefaultMetricPrefix),
|
||||
Help: "Requests processed, partitioned by endpoint and status",
|
||||
@@ -71,8 +272,8 @@ func registerMetrics() {
|
||||
)
|
||||
}
|
||||
|
||||
if timeCounterSummary == nil {
|
||||
timeCounterSummary = prometheus.NewSummaryVec(
|
||||
if clientTimeCounterSummary == nil {
|
||||
clientTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%slatency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Request latencies in microseconds, partitioned by endpoint",
|
||||
@@ -86,8 +287,8 @@ func registerMetrics() {
|
||||
)
|
||||
}
|
||||
|
||||
if timeCounterHistogram == nil {
|
||||
timeCounterHistogram = prometheus.NewHistogramVec(
|
||||
if clientTimeCounterHistogram == nil {
|
||||
clientTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%srequest_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Request time in seconds, partitioned by endpoint",
|
||||
@@ -101,11 +302,11 @@ func registerMetrics() {
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{opsCounter, timeCounterSummary, timeCounterHistogram} {
|
||||
for _, collector := range []prometheus.Collector{clientOpsCounter, clientTimeCounterSummary, clientTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(err)
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -119,13 +320,14 @@ type wrapper struct {
|
||||
}
|
||||
|
||||
func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
registerMetrics()
|
||||
|
||||
options := Options{}
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
options := Options{Context: context.Background()}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
registerClientMetrics(options.Context)
|
||||
registerPublishMetrics(options.Context)
|
||||
|
||||
return func(c client.Client) client.Client {
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
@@ -137,13 +339,13 @@ func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
}
|
||||
|
||||
func NewCallWrapper(opts ...Option) client.CallWrapper {
|
||||
registerMetrics()
|
||||
|
||||
options := Options{}
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
options := Options{Context: context.Background()}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
registerClientMetrics(options.Context)
|
||||
|
||||
return func(fn client.CallFunc) client.CallFunc {
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
@@ -154,21 +356,21 @@ func NewCallWrapper(opts ...Option) client.CallWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wrapper) CallFunc(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
clientTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
clientTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := w.callFunc(ctx, node, req, rsp, opts)
|
||||
err := w.callFunc(ctx, addr, req, rsp, opts)
|
||||
if err == nil {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -180,16 +382,16 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
clientTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
clientTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := w.Client.Call(ctx, req, rsp, opts...)
|
||||
if err == nil {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -200,16 +402,16 @@ func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
clientTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
clientTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
stream, err := w.Client.Stream(ctx, req, opts...)
|
||||
if err == nil {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return stream, err
|
||||
@@ -220,28 +422,27 @@ func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
publishTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
publishTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := w.Client.Publish(ctx, p, opts...)
|
||||
if err == nil {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
publishOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
publishOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
registerMetrics()
|
||||
|
||||
options := Options{}
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
options := Options{Context: context.Background()}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
registerServerMetrics(options.Context)
|
||||
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
@@ -256,16 +457,16 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
serverTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
serverTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := fn(ctx, req, rsp)
|
||||
if err == nil {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
serverOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
serverOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
@@ -273,13 +474,13 @@ func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
}
|
||||
|
||||
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
registerMetrics()
|
||||
|
||||
options := Options{}
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
options := Options{Context: context.Background()}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
registerSubscribeMetrics(options.Context)
|
||||
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
}
|
||||
@@ -293,16 +494,16 @@ func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
timeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
timeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
subscribeTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
subscribeTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := fn(ctx, msg)
|
||||
if err == nil {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
subscribeOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
opsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
subscribeOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user