add ExposeLag option, to be able to disable lag exporting
Some checks failed
coverage / build (push) Successful in 4m20s
test / test (push) Failing after 16m7s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-10-09 23:20:10 +03:00
parent c4f8de4aca
commit ffd10d7ea9
5 changed files with 100 additions and 59 deletions

64
kgo.go
View File

@@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v4/broker"
@@ -245,6 +247,68 @@ func (k *Broker) Connect(ctx context.Context) error {
k.connected.Store(1)
k.mu.Unlock()
exposeLag := false
if k.opts.Context != nil {
if v, ok := k.opts.Context.Value(exposeLagKey{}).(bool); ok && v {
exposeLag = v
}
}
if exposeLag {
var mu sync.Mutex
var lastUpdate time.Time
type pl struct {
p string
l float64
}
lag := make(map[string]map[string]pl) // topic => group => partition => lag
ac := kadm.NewClient(k.c)
updateStats := func() {
mu.Lock()
if time.Since(lastUpdate) < DefaultStatsInterval {
return
}
mu.Unlock()
k.mu.Lock()
groups := make([]string, 0, len(k.subs))
for _, g := range k.subs {
groups = append(groups, g.opts.Group)
}
k.mu.Unlock()
dgls, err := ac.Lag(ctx, groups...)
if err != nil || !dgls.Ok() {
k.opts.Logger.Error(k.opts.Context, "kgo describe group lag error", err)
return
}
for gn, dgl := range dgls {
for tn, lmap := range dgl.Lag {
if _, ok := lag[tn]; !ok {
lag[tn] = make(map[string]pl)
}
for p, l := range lmap {
lag[tn][gn] = pl{p: strconv.Itoa(int(p)), l: float64(l.Lag)}
}
}
}
}
for tn, dg := range lag {
for gn, gl := range dg {
k.opts.Meter.Gauge(semconv.BrokerGroupLag,
func() float64 { updateStats(); return gl.l },
"topic", tn,
"group", gn,
"partition", gl.p)
}
}
}
return nil
}