add metrics

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-08-30 20:02:32 +03:00
parent 54ecafc4b1
commit 8dfa4f2a6d
3 changed files with 60 additions and 191 deletions

4
go.sum
View File

@ -59,8 +59,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b h1:t7+Qdd/GHtUBN6D71CwwOEfsZmK0QQ/up3M6gaoaHQ4=
github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b/go.mod h1:Txc5/v0DIKGcdCa1VZhyasECMJ4svN/LHUKYUPYihL0=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210823212011-0d01f7456b4d/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210825163214-e185676761dd h1:8TvJbg0dnOaq4Oex1wnYejPokyUQ4zG2OQUfD76yq4M=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210825163214-e185676761dd/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210826221812-c6df11da978a h1:UBO58MRI/GH5eD5+fKXZoJMGyEcsUs9LQnHJbNL0yKs=
github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210826221812-c6df11da978a/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
@ -92,7 +90,6 @@ golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -131,7 +128,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=

2
kgo.go
View File

@ -341,7 +341,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha
// kgo.KeepControlRecords(),
kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()),
kgo.FetchIsolationLevel(kgo.ReadUncommitted()),
// kgo.WithHooks(&metrics{meter: k.opts.Meter}),
kgo.WithHooks(&metrics{meter: k.opts.Meter}),
// TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked
)

View File

@ -1,240 +1,113 @@
package kgo
/*
import (
"net"
"net/http"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/unistack-org/micro/v3/meter"
)
type metrics struct {
meter meter.Meter
connects *prometheus.CounterVec
connectErrs *prometheus.CounterVec
disconnects *prometheus.CounterVec
writeErrs *prometheus.CounterVec
writeBytes *prometheus.CounterVec
writeWaits *prometheus.HistogramVec
writeTimings *prometheus.HistogramVec
readErrs *prometheus.CounterVec
readBytes *prometheus.CounterVec
readWaits *prometheus.HistogramVec
readTimings *prometheus.HistogramVec
throttles *prometheus.HistogramVec
produceBatchesUncompressed *prometheus.CounterVec
produceBatchesCompressed *prometheus.CounterVec
fetchBatchesUncompressed *prometheus.CounterVec
fetchBatchesCompressed *prometheus.CounterVec
}
var (
_ kgo.HookBrokerConnect = &metrics{}
_ kgo.HookBrokerDisconnect = &metrics{}
_ kgo.HookBrokerE2E = &metrics{}
_ kgo.HookBrokerRead = &metrics{}
_ kgo.HookBrokerThrottle = &metrics{}
_ kgo.HookBrokerWrite = &metrics{}
_ kgo.HookFetchBatchRead = &metrics{}
_ kgo.HookProduceBatchWritten = &metrics{}
/*
HookFetchRecordBuffered
HookFetchRecordUnbuffered
HookGroupManageError
HookNewClient
HookProduceRecordBuffered
HookProduceRecordUnbuffered
*/
/*
_ kgo.HookGroupManageError = &metrics{}
)
func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
const (
metricBrokerConnects = "broker_connects_total"
metricBrokerDisconnects = "broker_disconnects_total"
metricBrokerWriteErrors = "broker_write_errors_total"
metricBrokerWriteBytes = "broker_write_bytes_total"
metricBrokerWriteWaitLatencies = "broker_write_wait_latencies"
metricBrokerWriteLatencies = "broker_write_latencies"
metricBrokerReadErrors = "broker_read_errors_total"
metricBrokerReadBytes = "broker_read_bytes_total"
metricBrokerReadWaitLatencies = "broker_read_wait_latencies"
metricBrokerReadLatencies = "broker_read_latencies"
metricBrokerThrottleLatencies = "broker_throttle_latencies"
metricBrokerProduceBytesCompressed = "produce_bytes_compressed_total"
metricBrokerProduceBytesUncompressed = "produce_bytes_uncompressed_total"
metricBrokerFetchBytesUncompressed = "broker_fetch_bytes_uncompressed_total"
metricBrokerFetchBytesCompressed = "broker_fetch_bytes_compressed_total"
metricBrokerGroupErrors = "broker_group_errors_total"
labelNode = "node_id"
labelSuccess = "success"
labelFaulure = "failure"
labelStatus = "status"
labelTopic = "topic"
)
func (m *metrics) OnGroupManageError(err error) {
m.meter.Counter(metricBrokerGroupErrors).Inc()
}
func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.connectErrs.WithLabelValues(node).Inc()
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc()
return
}
m.connects.WithLabelValues(node).Inc()
m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc()
}
func (m *Metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
func (m *metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
node := strconv.Itoa(int(meta.NodeID))
m.disconnects.WithLabelValues(node).Inc()
m.meter.Counter(metricBrokerDisconnects, labelNode, node).Inc()
}
func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.writeErrs.WithLabelValues(node).Inc()
m.meter.Counter(metricBrokerWriteErrors, labelNode, node).Inc()
return
}
m.writeBytes.WithLabelValues(node).Add(float64(bytesWritten))
m.writeWaits.WithLabelValues(node).Observe(writeWait.Seconds())
m.writeTimings.WithLabelValues(node).Observe(timeToWrite.Seconds())
m.meter.Counter(metricBrokerWriteBytes, labelNode, node).Add(bytesWritten)
m.meter.Histogram(metricBrokerWriteWaitLatencies, labelNode, node).Update(writeWait.Seconds())
m.meter.Histogram(metricBrokerWriteLatencies, labelNode, node).Update(timeToWrite.Seconds())
}
func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
node := strconv.Itoa(int(meta.NodeID))
if err != nil {
m.readErrs.WithLabelValues(node).Inc()
m.meter.Counter(metricBrokerReadErrors, labelNode, node).Inc()
return
}
m.readBytes.WithLabelValues(node).Add(float64(bytesRead))
m.readWaits.WithLabelValues(node).Observe(readWait.Seconds())
m.readTimings.WithLabelValues(node).Observe(timeToRead.Seconds())
m.meter.Counter(metricBrokerReadBytes, labelNode, node).Add(bytesRead)
m.meter.Histogram(metricBrokerReadWaitLatencies, labelNode, node).Update(readWait.Seconds())
m.meter.Histogram(metricBrokerReadLatencies, labelNode, node).Update(timeToRead.Seconds())
}
func (m *Metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
func (m *metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) {
node := strconv.Itoa(int(meta.NodeID))
m.throttles.WithLabelValues(node).Observe(throttleInterval.Seconds())
m.meter.Histogram(metricBrokerThrottleLatencies, labelNode, node).Update(throttleInterval.Seconds())
}
func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.ProduceBatchMetrics) {
func (m *metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
m.produceBatchesUncompressed.WithLabelValues(node, topic).Add(float64(metrics.UncompressedBytes))
m.produceBatchesCompressed.WithLabelValues(node, topic).Add(float64(metrics.CompressedBytes))
m.meter.Counter(metricBrokerProduceBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes)
m.meter.Counter(metricBrokerProduceBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes)
}
func (m *Metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.FetchBatchMetrics) {
func (m *metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) {
node := strconv.Itoa(int(meta.NodeID))
m.fetchBatchesUncompressed.WithLabelValues(node, topic).Add(float64(metrics.UncompressedBytes))
m.fetchBatchesCompressed.WithLabelValues(node, topic).Add(float64(metrics.CompressedBytes))
m.meter.Counter(metricBrokerFetchBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes)
m.meter.Counter(metricBrokerFetchBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes)
}
func NewMetrics(namespace string) (m *Metrics) {
reg := prometheus.NewRegistry()
factory := promauto.With(reg)
reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
reg.MustRegister(prometheus.NewGoCollector())
return &Metrics{
reg: reg,
// connects and disconnects
connects: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "connects_total",
Help: "Total number of connections opened, by broker",
}, []string{"node_id"}),
connectErrs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "connect_errors_total",
Help: "Total number of connection errors, by broker",
}, []string{"node_id"}),
disconnects: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "disconnects_total",
Help: "Total number of connections closed, by broker",
}, []string{"node_id"}),
// write
writeErrs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "write_errors_total",
Help: "Total number of write errors, by broker",
}, []string{"node_id"}),
writeBytes: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "write_bytes_total",
Help: "Total number of bytes written, by broker",
}, []string{"node_id"}),
writeWaits: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "write_wait_latencies",
Help: "Latency of time spent waiting to write to Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
writeTimings: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "write_latencies",
Help: "Latency of time spent writing to Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
// read
readErrs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "read_errors_total",
Help: "Total number of read errors, by broker",
}, []string{"node_id"}),
readBytes: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "read_bytes_total",
Help: "Total number of bytes read, by broker",
}, []string{"node_id"}),
readWaits: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "read_wait_latencies",
Help: "Latency of time spent waiting to read from Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
readTimings: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "read_latencies",
Help: "Latency of time spent reading from Kafka, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
// throttles
throttles: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Name: "throttle_latencies",
Help: "Latency of Kafka request throttles, in seconds by broker",
Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20),
}, []string{"node_id"}),
// produces & consumes
produceBatchesUncompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "produce_bytes_uncompressed_total",
Help: "Total number of uncompressed bytes produced, by broker and topic",
}, []string{"broker", "topic"}),
produceBatchesCompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "produce_bytes_compressed_total",
Help: "Total number of compressed bytes actually produced, by topic and partition",
}, []string{"topic", "partition"}),
fetchBatchesUncompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "fetch_bytes_uncompressed_total",
Help: "Total number of uncompressed bytes fetched, by topic and partition",
}, []string{"topic", "partition"}),
fetchBatchesCompressed: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "fetch_bytes_compressed_total",
Help: "Total number of compressed bytes actually fetched, by topic and partition",
}, []string{"topic", "partition"}),
}
}
*/