Compare commits

..

2 Commits

Author SHA1 Message Date
7329bc23bc export lag for all partition, not only owned
Some checks failed
build / test (push) Failing after 1m14s
build / lint (push) Successful in 9m28s
codeql / analyze (go) (push) Failing after 14m55s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-02 23:01:04 +03:00
c240631cdb fixup panic
Some checks failed
build / test (push) Failing after 1m32s
codeql / analyze (go) (push) Failing after 2m37s
build / lint (push) Successful in 9m31s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-05-02 10:32:33 +03:00
2 changed files with 8 additions and 6 deletions

8
kgo.go
View File

@@ -98,7 +98,9 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err
select {
case <-ctx.Done():
if ctx.Err() != nil {
sp.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
if sp != nil {
sp.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
}
}
return nil, ctx.Err()
default:
@@ -107,7 +109,9 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err
err = c.Ping(ctx) // check connectivity to cluster
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
if sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
return nil, err
}
}

View File

@@ -106,10 +106,8 @@ func (s *subscriber) poll(ctx context.Context) {
}
s.Lock()
for tp := range s.consumers {
if v, ok := lmap[tp.p]; ok {
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(tp.p)), "lag", strconv.Itoa(int(v.Lag)))
}
for p, l := range lmap {
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag)))
}
s.Unlock()