update to latest micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
741
prometheus.go
741
prometheus.go
@@ -1,511 +1,340 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
)
|
||||
|
||||
var (
|
||||
// default metric prefix
|
||||
DefaultMetricPrefix = "micro_"
|
||||
// default label prefix
|
||||
DefaultLabelPrefix = "micro_"
|
||||
|
||||
clientOpsCounter *prometheus.CounterVec
|
||||
clientTimeCounterSummary *prometheus.SummaryVec
|
||||
clientTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
serverOpsCounter *prometheus.CounterVec
|
||||
serverTimeCounterSummary *prometheus.SummaryVec
|
||||
serverTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
publishOpsCounter *prometheus.CounterVec
|
||||
publishTimeCounterSummary *prometheus.SummaryVec
|
||||
publishTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
subscribeOpsCounter *prometheus.CounterVec
|
||||
subscribeTimeCounterSummary *prometheus.SummaryVec
|
||||
subscribeTimeCounterHistogram *prometheus.HistogramVec
|
||||
|
||||
mu sync.Mutex
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Name string
|
||||
Version string
|
||||
ID string
|
||||
Context context.Context
|
||||
type prometheusMeter struct {
|
||||
opts meter.Options
|
||||
set prometheus.Registerer
|
||||
counter map[string]prometheusCounter
|
||||
floatCounter map[string]prometheusFloatCounter
|
||||
gauge map[string]prometheusGauge
|
||||
histogram map[string]prometheusHistogram
|
||||
summary map[string]prometheusSummary
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func Context(ctx context.Context) Option {
|
||||
return func(o *Options) {
|
||||
o.Context = ctx
|
||||
func NewMeter(opts ...meter.Option) meter.Meter {
|
||||
return &prometheusMeter{
|
||||
set: prometheus.DefaultRegisterer,
|
||||
opts: meter.NewOptions(opts...),
|
||||
counter: make(map[string]prometheusCounter),
|
||||
floatCounter: make(map[string]prometheusFloatCounter),
|
||||
gauge: make(map[string]prometheusGauge),
|
||||
histogram: make(map[string]prometheusHistogram),
|
||||
summary: make(map[string]prometheusSummary),
|
||||
}
|
||||
}
|
||||
|
||||
func ServiceName(name string) Option {
|
||||
return func(o *Options) {
|
||||
o.Name = name
|
||||
func (m *prometheusMeter) buildMetric(name string, labels ...string) string {
|
||||
if len(m.opts.MetricPrefix) > 0 {
|
||||
name = m.opts.MetricPrefix + name
|
||||
}
|
||||
|
||||
nl := len(m.opts.Labels) + len(labels)
|
||||
if nl == 0 {
|
||||
return name
|
||||
}
|
||||
|
||||
nlabels := make([]string, 0, nl)
|
||||
nlabels = append(nlabels, m.opts.Labels...)
|
||||
nlabels = append(nlabels, labels...)
|
||||
|
||||
if len(m.opts.LabelPrefix) == 0 {
|
||||
return meter.BuildName(name, nlabels...)
|
||||
}
|
||||
|
||||
for idx := 0; idx < nl; idx++ {
|
||||
nlabels[idx] = m.opts.LabelPrefix + nlabels[idx]
|
||||
idx++
|
||||
}
|
||||
return meter.BuildName(name, nlabels...)
|
||||
}
|
||||
|
||||
func ServiceVersion(version string) Option {
|
||||
return func(o *Options) {
|
||||
o.Version = version
|
||||
func (m *prometheusMeter) buildName(name string) string {
|
||||
if len(m.opts.MetricPrefix) > 0 {
|
||||
name = m.opts.MetricPrefix + name
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
func ServiceID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.ID = id
|
||||
func (m *prometheusMeter) buildLabels(labels ...string) []string {
|
||||
nl := len(m.opts.Labels) + len(labels)
|
||||
if nl == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
nlabels := make([]string, 0, nl)
|
||||
nlabels = append(nlabels, m.opts.Labels...)
|
||||
nlabels = append(nlabels, labels...)
|
||||
|
||||
for idx := 0; idx < nl; idx++ {
|
||||
nlabels[idx] = m.opts.LabelPrefix + nlabels[idx]
|
||||
idx++
|
||||
}
|
||||
return nlabels
|
||||
}
|
||||
|
||||
func registerServerMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if serverOpsCounter == nil {
|
||||
serverOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%sserver_request_total", DefaultMetricPrefix),
|
||||
Help: "Requests processed, partitioned by endpoint and status",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if serverTimeCounterSummary == nil {
|
||||
serverTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%sserver_latency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Request latencies in microseconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if serverTimeCounterHistogram == nil {
|
||||
serverTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%sserver_request_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Request time in seconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{serverOpsCounter, serverTimeCounterSummary, serverTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) Name() string {
|
||||
return m.opts.Name
|
||||
}
|
||||
|
||||
func registerPublishMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if publishOpsCounter == nil {
|
||||
publishOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%spublish_message_total", DefaultMetricPrefix),
|
||||
Help: "Messages sent, partitioned by endpoint and status",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"),
|
||||
},
|
||||
)
|
||||
func (m *prometheusMeter) mapLabels(labels ...string) map[string]string {
|
||||
labels = m.buildLabels(labels...)
|
||||
elementMap := make(map[string]string, len(labels)/2)
|
||||
for idx := 0; idx < len(labels); idx++ {
|
||||
elementMap[labels[idx]] = labels[idx+1]
|
||||
idx++
|
||||
}
|
||||
|
||||
if publishTimeCounterSummary == nil {
|
||||
publishTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%spublish_message_latency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Message latencies in microseconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if publishTimeCounterHistogram == nil {
|
||||
publishTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%spublish_message_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Message publish time in seconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{publishOpsCounter, publishTimeCounterSummary, publishTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return elementMap
|
||||
}
|
||||
|
||||
func registerSubscribeMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
func (m *prometheusMeter) Counter(name string, labels ...string) meter.Counter {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if subscribeOpsCounter == nil {
|
||||
subscribeOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%ssubscribe_message_total", DefaultMetricPrefix),
|
||||
Help: "Messages processed, partitioned by endpoint and status",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if subscribeTimeCounterSummary == nil {
|
||||
subscribeTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%ssubscribe_message_latency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Message processing latencies in microseconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if subscribeTimeCounterHistogram == nil {
|
||||
subscribeTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%ssubscribe_message_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Request time in seconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{subscribeOpsCounter, subscribeTimeCounterSummary, subscribeTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
nm := m.buildMetric(name, labels...)
|
||||
c, ok := m.counter[nm]
|
||||
if !ok {
|
||||
nc := prometheus.NewGauge(prometheus.GaugeOpts{Name: m.buildName(name), ConstLabels: m.mapLabels(labels...)})
|
||||
m.set.MustRegister(nc)
|
||||
c = prometheusCounter{c: nc}
|
||||
m.counter[nm] = c
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func registerClientMetrics(ctx context.Context) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
func (m *prometheusMeter) FloatCounter(name string, labels ...string) meter.FloatCounter {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if clientOpsCounter == nil {
|
||||
clientOpsCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: fmt.Sprintf("%srequest_total", DefaultMetricPrefix),
|
||||
Help: "Requests processed, partitioned by endpoint and status",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "status"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if clientTimeCounterSummary == nil {
|
||||
clientTimeCounterSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: fmt.Sprintf("%slatency_microseconds", DefaultMetricPrefix),
|
||||
Help: "Request latencies in microseconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if clientTimeCounterHistogram == nil {
|
||||
clientTimeCounterHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: fmt.Sprintf("%srequest_duration_seconds", DefaultMetricPrefix),
|
||||
Help: "Request time in seconds, partitioned by endpoint",
|
||||
},
|
||||
[]string{
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "name"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "version"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "id"),
|
||||
fmt.Sprintf("%s%s", DefaultLabelPrefix, "endpoint"),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
for _, collector := range []prometheus.Collector{clientOpsCounter, clientTimeCounterSummary, clientTimeCounterHistogram} {
|
||||
if err := prometheus.DefaultRegisterer.Register(collector); err != nil {
|
||||
// if already registered, skip fatal
|
||||
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
|
||||
logger.Fatal(ctx, err.Error())
|
||||
}
|
||||
}
|
||||
nm := m.buildMetric(name, labels...)
|
||||
c, ok := m.floatCounter[nm]
|
||||
if !ok {
|
||||
nc := prometheus.NewGauge(prometheus.GaugeOpts{Name: m.buildName(name), ConstLabels: m.mapLabels(labels...)})
|
||||
m.set.MustRegister(nc)
|
||||
c = prometheusFloatCounter{c: nc}
|
||||
m.floatCounter[nm] = c
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
type wrapper struct {
|
||||
options Options
|
||||
callFunc client.CallFunc
|
||||
client.Client
|
||||
func (m *prometheusMeter) Gauge(name string, fn func() float64, labels ...string) meter.Gauge {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
nm := m.buildMetric(name, labels...)
|
||||
c, ok := m.gauge[nm]
|
||||
if !ok {
|
||||
nc := prometheus.NewGauge(prometheus.GaugeOpts{Name: m.buildName(name), ConstLabels: m.mapLabels(labels...)})
|
||||
m.set.MustRegister(nc)
|
||||
c = prometheusGauge{c: nc}
|
||||
m.gauge[nm] = c
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||
options := Options{Context: context.Background()}
|
||||
func (m *prometheusMeter) Histogram(name string, labels ...string) meter.Histogram {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
nm := m.buildMetric(name, labels...)
|
||||
c, ok := m.histogram[nm]
|
||||
if !ok {
|
||||
nc := prometheus.NewHistogram(prometheus.HistogramOpts{Name: m.buildName(name), ConstLabels: m.mapLabels(labels...)})
|
||||
m.set.MustRegister(nc)
|
||||
c = prometheusHistogram{c: nc}
|
||||
m.histogram[nm] = c
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) Summary(name string, labels ...string) meter.Summary {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
nm := m.buildMetric(name, labels...)
|
||||
c, ok := m.summary[nm]
|
||||
if !ok {
|
||||
nc := prometheus.NewSummary(prometheus.SummaryOpts{Name: m.buildName(name), ConstLabels: m.mapLabels(labels...)})
|
||||
m.set.MustRegister(nc)
|
||||
c = prometheusSummary{c: nc}
|
||||
m.summary[nm] = c
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) SummaryExt(name string, window time.Duration, quantiles []float64, labels ...string) meter.Summary {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
nm := m.buildMetric(name, labels...)
|
||||
c, ok := m.summary[nm]
|
||||
if !ok {
|
||||
nc := prometheus.NewSummary(prometheus.SummaryOpts{Name: m.buildName(name), ConstLabels: m.mapLabels(labels...)})
|
||||
m.set.MustRegister(nc)
|
||||
c = prometheusSummary{c: nc}
|
||||
m.summary[nm] = c
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) Init(opts ...meter.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.opts)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) Write(w io.Writer, opts ...meter.Option) error {
|
||||
options := m.opts
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
registerClientMetrics(options.Context)
|
||||
registerPublishMetrics(options.Context)
|
||||
|
||||
return func(c client.Client) client.Client {
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
Client: c,
|
||||
}
|
||||
|
||||
return handler
|
||||
}
|
||||
}
|
||||
|
||||
func NewCallWrapper(opts ...Option) client.CallWrapper {
|
||||
options := Options{Context: context.Background()}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
if options.WriteProcessMetrics || options.WriteFDMetrics {
|
||||
c := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})
|
||||
_ = m.set.Register(c)
|
||||
}
|
||||
|
||||
registerClientMetrics(options.Context)
|
||||
|
||||
return func(fn client.CallFunc) client.CallFunc {
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
callFunc: fn,
|
||||
}
|
||||
|
||||
return handler.CallFunc
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
clientTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
clientTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := w.callFunc(ctx, addr, req, rsp, opts)
|
||||
if err == nil {
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
clientTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
clientTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := w.Client.Call(ctx, req, rsp, opts...)
|
||||
if err == nil {
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
clientTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
clientTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
stream, err := w.Client.Stream(ctx, req, opts...)
|
||||
if err == nil {
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
clientOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return stream, err
|
||||
}
|
||||
|
||||
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||
endpoint := p.Topic()
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
publishTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
publishTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := w.Client.Publish(ctx, p, opts...)
|
||||
if err == nil {
|
||||
publishOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
publishOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
||||
options := Options{Context: context.Background()}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
registerServerMetrics(options.Context)
|
||||
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
}
|
||||
|
||||
return handler.HandlerFunc
|
||||
}
|
||||
|
||||
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
endpoint := req.Endpoint()
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
serverTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
serverTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := fn(ctx, req, rsp)
|
||||
if err == nil {
|
||||
serverOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
serverOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
mfs, err := m.set.(prometheus.Gatherer).Gather()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
enc := expfmt.NewEncoder(w, expfmt.FmtText)
|
||||
for _, mf := range mfs {
|
||||
_ = enc.Encode(mf)
|
||||
}
|
||||
|
||||
if closer, ok := enc.(io.Closer); ok {
|
||||
_ = closer.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||
options := Options{Context: context.Background()}
|
||||
func (m *prometheusMeter) Clone(opts ...meter.Option) meter.Meter {
|
||||
options := m.opts
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
registerSubscribeMetrics(options.Context)
|
||||
|
||||
handler := &wrapper{
|
||||
options: options,
|
||||
}
|
||||
|
||||
return handler.SubscriberFunc
|
||||
}
|
||||
|
||||
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
|
||||
return func(ctx context.Context, msg server.Message) error {
|
||||
endpoint := msg.Topic()
|
||||
|
||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||
us := v * 1000000 // make microseconds
|
||||
subscribeTimeCounterSummary.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(us)
|
||||
subscribeTimeCounterHistogram.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint).Observe(v)
|
||||
}))
|
||||
defer timer.ObserveDuration()
|
||||
|
||||
err := fn(ctx, msg)
|
||||
if err == nil {
|
||||
subscribeOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "success").Inc()
|
||||
} else {
|
||||
subscribeOpsCounter.WithLabelValues(w.options.Name, w.options.Version, w.options.ID, endpoint, "failure").Inc()
|
||||
}
|
||||
|
||||
return err
|
||||
return &prometheusMeter{
|
||||
set: m.set,
|
||||
opts: options,
|
||||
counter: m.counter,
|
||||
gauge: m.gauge,
|
||||
histogram: m.histogram,
|
||||
summary: m.summary,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) Options() meter.Options {
|
||||
return m.opts
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) String() string {
|
||||
return "prometheus"
|
||||
}
|
||||
|
||||
func (m *prometheusMeter) Set(opts ...meter.Option) meter.Meter {
|
||||
nm := &prometheusMeter{opts: m.opts}
|
||||
for _, o := range opts {
|
||||
o(&nm.opts)
|
||||
}
|
||||
nm.set = prometheus.NewRegistry()
|
||||
return nm
|
||||
}
|
||||
|
||||
type prometheusCounter struct {
|
||||
c prometheus.Gauge
|
||||
}
|
||||
|
||||
func (c prometheusCounter) Add(n int) {
|
||||
c.c.Add(float64(n))
|
||||
}
|
||||
|
||||
func (c prometheusCounter) Dec() {
|
||||
c.c.Dec()
|
||||
}
|
||||
|
||||
func (c prometheusCounter) Inc() {
|
||||
c.c.Inc()
|
||||
}
|
||||
|
||||
func (c prometheusCounter) Get() uint64 {
|
||||
m := &dto.Metric{}
|
||||
if err := c.c.Write(m); err != nil {
|
||||
return 0
|
||||
}
|
||||
return uint64(m.GetGauge().GetValue())
|
||||
}
|
||||
|
||||
func (c prometheusCounter) Set(n uint64) {
|
||||
c.c.Set(float64(n))
|
||||
}
|
||||
|
||||
type prometheusFloatCounter struct {
|
||||
c prometheus.Gauge
|
||||
}
|
||||
|
||||
func (c prometheusFloatCounter) Add(n float64) {
|
||||
}
|
||||
|
||||
func (c prometheusFloatCounter) Get() float64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c prometheusFloatCounter) Set(n float64) {
|
||||
}
|
||||
|
||||
func (c prometheusFloatCounter) Sub(n float64) {
|
||||
}
|
||||
|
||||
type prometheusGauge struct {
|
||||
c prometheus.Gauge
|
||||
}
|
||||
|
||||
func (c prometheusGauge) Get() float64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
type prometheusHistogram struct {
|
||||
c prometheus.Histogram
|
||||
}
|
||||
|
||||
func (c prometheusHistogram) Reset() {
|
||||
}
|
||||
|
||||
func (c prometheusHistogram) Update(n float64) {
|
||||
}
|
||||
|
||||
func (c prometheusHistogram) UpdateDuration(n time.Time) {
|
||||
}
|
||||
|
||||
type prometheusSummary struct {
|
||||
c prometheus.Summary
|
||||
}
|
||||
|
||||
func (c prometheusSummary) Update(n float64) {
|
||||
}
|
||||
|
||||
func (c prometheusSummary) UpdateDuration(n time.Time) {
|
||||
}
|
||||
|
Reference in New Issue
Block a user