refactor tests && fix concurent read from map

This commit is contained in:
Evstigneev Denis
2026-01-14 14:38:12 +03:00
parent c8eeb34efe
commit f8d9e0584f
5 changed files with 109 additions and 142 deletions

View File

@@ -153,17 +153,40 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
for topic, partitions := range lost {
for _, partition := range partitions {
tps := tp{topic, partition}
s.mu.Lock()
pc, ok := s.consumers[tps]
if ok {
delete(s.consumers, tps)
}
s.mu.Unlock()
if !ok || pc == nil {
continue
}
delete(s.consumers, tps)
close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition))
}
close(pc.quit)
wg.Add(1)
go func() { <-pc.done; wg.Done() }()
go func(c *consumer, t string, p int32) {
defer wg.Done()
timeout := time.NewTimer(s.kopts.GracefulTimeout)
defer timeout.Stop()
select {
case <-c.done:
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
}
case <-timeout.C:
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
}
}
}(pc, topic, partition)
}
}
}