new kgo version
All checks were successful
sync / sync (push) Has been skipped
coverage / build (push) Successful in 4m56s
test / test (push) Successful in 4m59s

update deps

fixup race conditions

add kfake usage

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-05-11 01:49:38 +03:00
parent ccd912adb2
commit e34f57a515
6 changed files with 104 additions and 74 deletions

View File

@@ -146,7 +146,12 @@ func (s *Subscriber) poll(ctx context.Context) {
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tps := tp{p.Topic, p.Partition}
s.consumers[tps].recs <- p
s.Lock()
c := s.consumers[tps]
s.Unlock()
if c != nil {
c.recs <- p
}
})
s.c.AllowRebalance()
}
@@ -160,11 +165,15 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
for topic, partitions := range lost {
for _, partition := range partitions {
tps := tp{topic, partition}
s.Lock()
pc, ok := s.consumers[tps]
s.Unlock()
if !ok {
continue
}
s.Lock()
delete(s.consumers, tps)
s.Unlock()
close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))