add ExposeLag option, to be able to disable lag exporting
Some checks failed
coverage / build (push) Failing after 1m35s
test / test (push) Failing after 15m18s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-10-09 23:20:10 +03:00
parent d5d1e26d7b
commit 581fecd8f1
5 changed files with 121 additions and 81 deletions

82
kgo.go
View File

@@ -6,12 +6,14 @@ import (
"errors"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.unistack.org/micro/v3/broker"
@@ -66,7 +68,7 @@ type Broker struct {
subs []*Subscriber
opts broker.Options
sync.RWMutex
mu sync.RWMutex
init bool
}
@@ -184,10 +186,72 @@ func (k *Broker) Connect(ctx context.Context) error {
return err
}
k.Lock()
k.mu.Lock()
k.c = c
k.connected.Store(1)
k.Unlock()
k.mu.Unlock()
exposeLag := false
if k.opts.Context != nil {
if v, ok := k.opts.Context.Value(exposeLagKey{}).(bool); ok && v {
exposeLag = v
}
}
if exposeLag {
var mu sync.Mutex
var lastUpdate time.Time
type pl struct {
p string
l float64
}
lag := make(map[string]map[string]pl) // topic => group => partition => lag
ac := kadm.NewClient(k.c)
updateStats := func() {
mu.Lock()
if time.Since(lastUpdate) < DefaultStatsInterval {
return
}
mu.Unlock()
k.mu.Lock()
groups := make([]string, 0, len(k.subs))
for _, g := range k.subs {
groups = append(groups, g.opts.Group)
}
k.mu.Unlock()
dgls, err := ac.Lag(ctx, groups...)
if err != nil || !dgls.Ok() {
k.opts.Logger.Error(k.opts.Context, "kgo describe group lag error", err)
return
}
for gn, dgl := range dgls {
for tn, lmap := range dgl.Lag {
if _, ok := lag[tn]; !ok {
lag[tn] = make(map[string]pl)
}
for p, l := range lmap {
lag[tn][gn] = pl{p: strconv.Itoa(int(p)), l: float64(l.Lag)}
}
}
}
}
for tn, dg := range lag {
for gn, gl := range dg {
k.opts.Meter.Gauge(semconv.BrokerGroupLag,
func() float64 { updateStats(); return gl.l },
"topic", tn,
"group", gn,
"partition", gl.p)
}
}
}
return nil
}
@@ -205,8 +269,8 @@ func (k *Broker) Disconnect(ctx context.Context) error {
ctx, span = k.opts.Tracer.Start(ctx, "Disconnect")
defer span.Finish()
k.Lock()
defer k.Unlock()
k.mu.Lock()
defer k.mu.Unlock()
select {
case <-nctx.Done():
return nctx.Err()
@@ -231,8 +295,8 @@ func (k *Broker) Disconnect(ctx context.Context) error {
}
func (k *Broker) Init(opts ...broker.Option) error {
k.Lock()
defer k.Unlock()
k.mu.Lock()
defer k.mu.Unlock()
if len(opts) == 0 && k.init {
return nil
@@ -455,9 +519,9 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
go sub.poll(ctx)
k.Lock()
k.mu.Lock()
k.subs = append(k.subs, sub)
k.Unlock()
k.mu.Unlock()
return sub, nil
}