diff --git a/go.sum b/go.sum index eeb0b07..45704a8 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b h1:t7+Qdd/GHtUBN6D71CwwOEfsZmK0QQ/up3M6gaoaHQ4= github.com/twmb/franz-go v0.10.3-0.20210825060253-d5e80b38ca2b/go.mod h1:Txc5/v0DIKGcdCa1VZhyasECMJ4svN/LHUKYUPYihL0= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210823212011-0d01f7456b4d/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210825163214-e185676761dd h1:8TvJbg0dnOaq4Oex1wnYejPokyUQ4zG2OQUfD76yq4M= -github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210825163214-e185676761dd/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210826221812-c6df11da978a h1:UBO58MRI/GH5eD5+fKXZoJMGyEcsUs9LQnHJbNL0yKs= github.com/twmb/franz-go/pkg/kmsg v0.0.0-20210826221812-c6df11da978a/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= @@ -92,7 +90,6 @@ golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -131,7 +128,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/kgo.go b/kgo.go index 1db6568..d6543c3 100644 --- a/kgo.go +++ b/kgo.go @@ -341,7 +341,7 @@ func (k *kBroker) Subscribe(ctx context.Context, topic string, handler broker.Ha // kgo.KeepControlRecords(), kgo.Balancers(kgo.CooperativeStickyBalancer(), kgo.StickyBalancer()), kgo.FetchIsolationLevel(kgo.ReadUncommitted()), - // kgo.WithHooks(&metrics{meter: k.opts.Meter}), + kgo.WithHooks(&metrics{meter: k.opts.Meter}), // TODO: must set https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#OnRevoked ) diff --git a/metrics.go b/metrics.go index 0584da6..e03efd1 100644 --- a/metrics.go +++ b/metrics.go @@ -1,240 +1,113 @@ package kgo -/* import ( "net" - "net/http" "strconv" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/twmb/franz-go/pkg/kgo" "github.com/unistack-org/micro/v3/meter" ) type metrics struct { meter meter.Meter - - connects *prometheus.CounterVec - connectErrs *prometheus.CounterVec - disconnects *prometheus.CounterVec - - writeErrs *prometheus.CounterVec - writeBytes *prometheus.CounterVec - writeWaits *prometheus.HistogramVec - writeTimings *prometheus.HistogramVec - - readErrs *prometheus.CounterVec - readBytes *prometheus.CounterVec - readWaits *prometheus.HistogramVec - readTimings *prometheus.HistogramVec - - throttles *prometheus.HistogramVec - - produceBatchesUncompressed *prometheus.CounterVec - produceBatchesCompressed *prometheus.CounterVec - - fetchBatchesUncompressed *prometheus.CounterVec - fetchBatchesCompressed *prometheus.CounterVec } var ( _ kgo.HookBrokerConnect = &metrics{} _ kgo.HookBrokerDisconnect = &metrics{} - _ kgo.HookBrokerE2E = &metrics{} _ kgo.HookBrokerRead = &metrics{} _ kgo.HookBrokerThrottle = &metrics{} _ kgo.HookBrokerWrite = &metrics{} _ kgo.HookFetchBatchRead = &metrics{} _ kgo.HookProduceBatchWritten = &metrics{} - - /* - HookFetchRecordBuffered - HookFetchRecordUnbuffered - HookGroupManageError - HookNewClient - HookProduceRecordBuffered - HookProduceRecordUnbuffered -*/ -/* + _ kgo.HookGroupManageError = &metrics{} ) -func (m *Metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { +const ( + metricBrokerConnects = "broker_connects_total" + metricBrokerDisconnects = "broker_disconnects_total" + + metricBrokerWriteErrors = "broker_write_errors_total" + metricBrokerWriteBytes = "broker_write_bytes_total" + metricBrokerWriteWaitLatencies = "broker_write_wait_latencies" + metricBrokerWriteLatencies = "broker_write_latencies" + + metricBrokerReadErrors = "broker_read_errors_total" + metricBrokerReadBytes = "broker_read_bytes_total" + metricBrokerReadWaitLatencies = "broker_read_wait_latencies" + metricBrokerReadLatencies = "broker_read_latencies" + + metricBrokerThrottleLatencies = "broker_throttle_latencies" + + metricBrokerProduceBytesCompressed = "produce_bytes_compressed_total" + metricBrokerProduceBytesUncompressed = "produce_bytes_uncompressed_total" + metricBrokerFetchBytesUncompressed = "broker_fetch_bytes_uncompressed_total" + metricBrokerFetchBytesCompressed = "broker_fetch_bytes_compressed_total" + + metricBrokerGroupErrors = "broker_group_errors_total" + + labelNode = "node_id" + labelSuccess = "success" + labelFaulure = "failure" + labelStatus = "status" + labelTopic = "topic" +) + +func (m *metrics) OnGroupManageError(err error) { + m.meter.Counter(metricBrokerGroupErrors).Inc() +} + +func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { - m.connectErrs.WithLabelValues(node).Inc() + m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc() return } - m.connects.WithLabelValues(node).Inc() + m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc() } -func (m *Metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { +func (m *metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { node := strconv.Itoa(int(meta.NodeID)) - m.disconnects.WithLabelValues(node).Inc() + m.meter.Counter(metricBrokerDisconnects, labelNode, node).Inc() } -func (m *Metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { +func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { - m.writeErrs.WithLabelValues(node).Inc() + m.meter.Counter(metricBrokerWriteErrors, labelNode, node).Inc() return } - m.writeBytes.WithLabelValues(node).Add(float64(bytesWritten)) - m.writeWaits.WithLabelValues(node).Observe(writeWait.Seconds()) - m.writeTimings.WithLabelValues(node).Observe(timeToWrite.Seconds()) + m.meter.Counter(metricBrokerWriteBytes, labelNode, node).Add(bytesWritten) + m.meter.Histogram(metricBrokerWriteWaitLatencies, labelNode, node).Update(writeWait.Seconds()) + m.meter.Histogram(metricBrokerWriteLatencies, labelNode, node).Update(timeToWrite.Seconds()) } -func (m *Metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { +func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { - m.readErrs.WithLabelValues(node).Inc() + m.meter.Counter(metricBrokerReadErrors, labelNode, node).Inc() return } - m.readBytes.WithLabelValues(node).Add(float64(bytesRead)) - m.readWaits.WithLabelValues(node).Observe(readWait.Seconds()) - m.readTimings.WithLabelValues(node).Observe(timeToRead.Seconds()) + m.meter.Counter(metricBrokerReadBytes, labelNode, node).Add(bytesRead) + + m.meter.Histogram(metricBrokerReadWaitLatencies, labelNode, node).Update(readWait.Seconds()) + m.meter.Histogram(metricBrokerReadLatencies, labelNode, node).Update(timeToRead.Seconds()) } -func (m *Metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { +func (m *metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { node := strconv.Itoa(int(meta.NodeID)) - m.throttles.WithLabelValues(node).Observe(throttleInterval.Seconds()) + m.meter.Histogram(metricBrokerThrottleLatencies, labelNode, node).Update(throttleInterval.Seconds()) } -func (m *Metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.ProduceBatchMetrics) { +func (m *metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) { node := strconv.Itoa(int(meta.NodeID)) - m.produceBatchesUncompressed.WithLabelValues(node, topic).Add(float64(metrics.UncompressedBytes)) - m.produceBatchesCompressed.WithLabelValues(node, topic).Add(float64(metrics.CompressedBytes)) + m.meter.Counter(metricBrokerProduceBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) + m.meter.Counter(metricBrokerProduceBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) } -func (m *Metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, metrics kgo.FetchBatchMetrics) { +func (m *metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) { node := strconv.Itoa(int(meta.NodeID)) - m.fetchBatchesUncompressed.WithLabelValues(node, topic).Add(float64(metrics.UncompressedBytes)) - m.fetchBatchesCompressed.WithLabelValues(node, topic).Add(float64(metrics.CompressedBytes)) + m.meter.Counter(metricBrokerFetchBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) + m.meter.Counter(metricBrokerFetchBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) } - -func NewMetrics(namespace string) (m *Metrics) { - reg := prometheus.NewRegistry() - factory := promauto.With(reg) - - reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - reg.MustRegister(prometheus.NewGoCollector()) - - return &Metrics{ - reg: reg, - - // connects and disconnects - - connects: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "connects_total", - Help: "Total number of connections opened, by broker", - }, []string{"node_id"}), - - connectErrs: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "connect_errors_total", - Help: "Total number of connection errors, by broker", - }, []string{"node_id"}), - - disconnects: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "disconnects_total", - Help: "Total number of connections closed, by broker", - }, []string{"node_id"}), - - // write - - writeErrs: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "write_errors_total", - Help: "Total number of write errors, by broker", - }, []string{"node_id"}), - - writeBytes: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "write_bytes_total", - Help: "Total number of bytes written, by broker", - }, []string{"node_id"}), - - writeWaits: factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Name: "write_wait_latencies", - Help: "Latency of time spent waiting to write to Kafka, in seconds by broker", - Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20), - }, []string{"node_id"}), - - writeTimings: factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Name: "write_latencies", - Help: "Latency of time spent writing to Kafka, in seconds by broker", - Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20), - }, []string{"node_id"}), - - // read - - readErrs: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "read_errors_total", - Help: "Total number of read errors, by broker", - }, []string{"node_id"}), - - readBytes: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "read_bytes_total", - Help: "Total number of bytes read, by broker", - }, []string{"node_id"}), - - readWaits: factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Name: "read_wait_latencies", - Help: "Latency of time spent waiting to read from Kafka, in seconds by broker", - Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20), - }, []string{"node_id"}), - - readTimings: factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Name: "read_latencies", - Help: "Latency of time spent reading from Kafka, in seconds by broker", - Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20), - }, []string{"node_id"}), - - // throttles - - throttles: factory.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: namespace, - Name: "throttle_latencies", - Help: "Latency of Kafka request throttles, in seconds by broker", - Buckets: prometheus.ExponentialBuckets(0.0001, 1.5, 20), - }, []string{"node_id"}), - - // produces & consumes - - produceBatchesUncompressed: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "produce_bytes_uncompressed_total", - Help: "Total number of uncompressed bytes produced, by broker and topic", - }, []string{"broker", "topic"}), - - produceBatchesCompressed: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "produce_bytes_compressed_total", - Help: "Total number of compressed bytes actually produced, by topic and partition", - }, []string{"topic", "partition"}), - - fetchBatchesUncompressed: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "fetch_bytes_uncompressed_total", - Help: "Total number of uncompressed bytes fetched, by topic and partition", - }, []string{"topic", "partition"}), - - fetchBatchesCompressed: factory.NewCounterVec(prometheus.CounterOpts{ - Namespace: namespace, - Name: "fetch_bytes_compressed_total", - Help: "Total number of compressed bytes actually fetched, by topic and partition", - }, []string{"topic", "partition"}), - } -} - -*/