Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
7329bc23bc | |||
c240631cdb |
4
kgo.go
4
kgo.go
@@ -98,8 +98,10 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
if sp != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
|
sp.SetStatus(tracer.SpanStatusError, ctx.Err().Error())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
default:
|
default:
|
||||||
c, err = kgo.NewClient(opts...)
|
c, err = kgo.NewClient(opts...)
|
||||||
@@ -107,7 +109,9 @@ func (k *Broker) connect(ctx context.Context, opts ...kgo.Opt) (*kgo.Client, err
|
|||||||
err = c.Ping(ctx) // check connectivity to cluster
|
err = c.Ping(ctx) // check connectivity to cluster
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if sp != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -106,10 +106,8 @@ func (s *subscriber) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
for tp := range s.consumers {
|
for p, l := range lmap {
|
||||||
if v, ok := lmap[tp.p]; ok {
|
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag)))
|
||||||
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(tp.p)), "lag", strconv.Itoa(int(v.Lag)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user