package kgo import ( "net" "strconv" "time" "github.com/twmb/franz-go/pkg/kgo" "go.unistack.org/micro/v4/meter" ) /* func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { handler := &wrapper{ opts: NewOptions(opts...), } return handler.SubscriberFunc } func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { return func(ctx context.Context, msg server.Message) error { endpoint := msg.Topic() labels := make([]string, 0, 4) labels = append(labels, labelEndpoint, endpoint) w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Inc() ts := time.Now() err := fn(ctx, msg) te := time.Since(ts) w.opts.Meter.Counter(SubscribeMessageInflight, labels...).Dec() w.opts.Meter.Summary(SubscribeMessageLatencyMicroseconds, labels...).Update(te.Seconds()) w.opts.Meter.Histogram(SubscribeMessageDurationSeconds, labels...).Update(te.Seconds()) if err == nil { labels = append(labels, labelStatus, labelSuccess) } else { labels = append(labels, labelStatus, labelFailure) } w.opts.Meter.Counter(SubscribeMessageTotal, labels...).Inc() return err } } */ type metrics struct { meter meter.Meter } var ( _ kgo.HookBrokerConnect = &metrics{} _ kgo.HookBrokerDisconnect = &metrics{} _ kgo.HookBrokerRead = &metrics{} _ kgo.HookBrokerThrottle = &metrics{} _ kgo.HookBrokerWrite = &metrics{} _ kgo.HookFetchBatchRead = &metrics{} _ kgo.HookProduceBatchWritten = &metrics{} _ kgo.HookGroupManageError = &metrics{} ) const ( metricBrokerConnects = "broker_connects_total" metricBrokerDisconnects = "broker_disconnects_total" metricBrokerWriteErrors = "broker_write_errors_total" metricBrokerWriteBytes = "broker_write_bytes_total" metricBrokerWriteWaitLatencies = "broker_write_wait_latencies" metricBrokerWriteLatencies = "broker_write_latencies" metricBrokerReadErrors = "broker_read_errors_total" metricBrokerReadBytes = "broker_read_bytes_total" metricBrokerReadWaitLatencies = "broker_read_wait_latencies" metricBrokerReadLatencies = "broker_read_latencies" metricBrokerThrottleLatencies = "broker_throttle_latencies" metricBrokerProduceBytesCompressed = "broker_produce_bytes_compressed_total" metricBrokerProduceBytesUncompressed = "broker_produce_bytes_uncompressed_total" metricBrokerFetchBytesCompressed = "broker_consume_bytes_compressed_total" metricBrokerFetchBytesUncompressed = "broker_consume_bytes_uncompressed_total" metricBrokerGroupErrors = "broker_group_errors_total" labelNode = "node_id" labelSuccess = "success" labelFaulure = "failure" labelStatus = "status" labelTopic = "topic" ) func (m *metrics) OnGroupManageError(err error) { m.meter.Counter(metricBrokerGroupErrors).Inc() } func (m *metrics) OnBrokerConnect(meta kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelFaulure).Inc() return } m.meter.Counter(metricBrokerConnects, labelNode, node, labelStatus, labelSuccess).Inc() } func (m *metrics) OnBrokerDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Counter(metricBrokerDisconnects, labelNode, node).Inc() } func (m *metrics) OnBrokerWrite(meta kgo.BrokerMetadata, _ int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { m.meter.Counter(metricBrokerWriteErrors, labelNode, node).Inc() return } m.meter.Counter(metricBrokerWriteBytes, labelNode, node).Add(bytesWritten) m.meter.Histogram(metricBrokerWriteWaitLatencies, labelNode, node).Update(writeWait.Seconds()) m.meter.Histogram(metricBrokerWriteLatencies, labelNode, node).Update(timeToWrite.Seconds()) } func (m *metrics) OnBrokerRead(meta kgo.BrokerMetadata, _ int16, bytesRead int, readWait, timeToRead time.Duration, err error) { node := strconv.Itoa(int(meta.NodeID)) if err != nil { m.meter.Counter(metricBrokerReadErrors, labelNode, node).Inc() return } m.meter.Counter(metricBrokerReadBytes, labelNode, node).Add(bytesRead) m.meter.Histogram(metricBrokerReadWaitLatencies, labelNode, node).Update(readWait.Seconds()) m.meter.Histogram(metricBrokerReadLatencies, labelNode, node).Update(timeToRead.Seconds()) } func (m *metrics) OnBrokerThrottle(meta kgo.BrokerMetadata, throttleInterval time.Duration, _ bool) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Histogram(metricBrokerThrottleLatencies, labelNode, node).Update(throttleInterval.Seconds()) } func (m *metrics) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.ProduceBatchMetrics) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Counter(metricBrokerProduceBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) m.meter.Counter(metricBrokerProduceBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) } func (m *metrics) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, kmetrics kgo.FetchBatchMetrics) { node := strconv.Itoa(int(meta.NodeID)) m.meter.Counter(metricBrokerFetchBytesUncompressed, labelNode, node, labelTopic, topic).Add(kmetrics.UncompressedBytes) m.meter.Counter(metricBrokerFetchBytesCompressed, labelNode, node, labelTopic, topic).Add(kmetrics.CompressedBytes) }