Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
f73d9d7347 | |||
53f72f1246 |
@@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) {
|
|||||||
|
|
||||||
s.Lock()
|
s.Lock()
|
||||||
for p, l := range lmap {
|
for p, l := range lmap {
|
||||||
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag)))
|
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag))
|
||||||
}
|
}
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
@@ -160,7 +160,10 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
|||||||
for topic, partitions := range lost {
|
for topic, partitions := range lost {
|
||||||
for _, partition := range partitions {
|
for _, partition := range partitions {
|
||||||
tps := tp{topic, partition}
|
tps := tp{topic, partition}
|
||||||
pc := s.consumers[tps]
|
pc, ok := s.consumers[tps]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
delete(s.consumers, tps)
|
delete(s.consumers, tps)
|
||||||
close(pc.quit)
|
close(pc.quit)
|
||||||
if s.kopts.Logger.V(logger.DebugLevel) {
|
if s.kopts.Logger.V(logger.DebugLevel) {
|
||||||
|
Reference in New Issue
Block a user