new kgo version
Some checks failed
sync / sync (push) Has been cancelled
coverage / build (push) Failing after 1m31s
test / test (push) Failing after 18m29s

update deps

fixup race conditions

add kfake usage

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-05-11 01:49:38 +03:00
parent b538ef82b5
commit 71e8a13c70

View File

@@ -147,9 +147,9 @@ func (s *Subscriber) poll(ctx context.Context) {
fetches.EachPartition(func(p kgo.FetchTopicPartition) { fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tps := tp{p.Topic, p.Partition} tps := tp{p.Topic, p.Partition}
s.Lock() s.mu.Lock()
c := s.consumers[tps] c := s.consumers[tps]
s.Unlock() s.mu.Unlock()
if c != nil { if c != nil {
c.recs <- p c.recs <- p
} }
@@ -166,15 +166,15 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
for topic, partitions := range lost { for topic, partitions := range lost {
for _, partition := range partitions { for _, partition := range partitions {
tps := tp{topic, partition} tps := tp{topic, partition}
s.Lock() s.mu.Lock()
pc, ok := s.consumers[tps] pc, ok := s.consumers[tps]
s.Unlock() s.mu.Unlock()
if !ok { if !ok {
continue continue
} }
s.Lock() s.mu.Lock()
delete(s.consumers, tps) delete(s.consumers, tps)
s.Unlock() s.mu.Unlock()
close(pc.quit) close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) { if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))