fix consume/killconsume
Some checks failed
coverage / build (pull_request) Failing after 3m29s
lint / lint (pull_request) Failing after 4m59s
test / test (pull_request) Failing after 18m36s

This commit is contained in:
Evstigneev Denis
2025-11-27 13:12:57 +03:00
parent 9dcdef57f4
commit 28cb71af46

View File

@@ -116,9 +116,9 @@ func (s *Subscriber) poll(ctx context.Context) {
fetches.EachPartition(func(p kgo.FetchTopicPartition) { fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tps := tp{p.Topic, p.Partition} tps := tp{p.Topic, p.Partition}
s.mu.Lock() s.mu.RLock()
c := s.consumers[tps] c := s.consumers[tps]
s.mu.Unlock() s.mu.RUnlock()
if c != nil { if c != nil {
c.recs <- p c.recs <- p
} }
@@ -137,19 +137,36 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
tps := tp{topic, partition} tps := tp{topic, partition}
s.mu.Lock() s.mu.Lock()
pc, ok := s.consumers[tps] pc, ok := s.consumers[tps]
if ok {
delete(s.consumers, tps)
}
s.mu.Unlock() s.mu.Unlock()
if !ok { if !ok {
continue continue
} }
s.mu.Lock()
delete(s.consumers, tps)
s.mu.Unlock()
close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) { if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition))
} }
close(pc.quit)
wg.Add(1) wg.Add(1)
go func() { <-pc.done; wg.Done() }() go func(c *consumer, t string, p int32) {
defer wg.Done()
timeout := time.NewTimer(30 * time.Second) //waiting stop consumer mb set to opts/cfg TimeoutWaitKillConsumer
defer timeout.Stop()
select {
case <-c.done:
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
}
case <-timeout.C:
s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
}
}(pc, topic, partition)
} }
} }
} }
@@ -175,11 +192,11 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
if s.kopts.Logger.V(logger.DebugLevel) { if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked)) s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked))
} }
s.killConsumers(ctx, revoked)
if err := c.CommitMarkedOffsets(ctx); err != nil { if err := c.CommitMarkedOffsets(ctx); err != nil {
s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err)
// s.connected.Store(0) // s.connected.Store(0)
} }
s.killConsumers(ctx, revoked)
} }
func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
@@ -222,8 +239,16 @@ func (pc *consumer) consume() {
select { select {
case <-pc.quit: case <-pc.quit:
return return
case p := <-pc.recs: case p, ok := <-pc.recs:
if !ok {
return
}
for _, record := range p.Records { for _, record := range p.Records {
select {
case <-pc.quit:
return
default:
}
ctx, sp := pc.htracer.WithProcessSpan(record) ctx, sp := pc.htracer.WithProcessSpan(record)
ts := time.Now() ts := time.Now()
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
@@ -246,12 +271,25 @@ func (pc *consumer) consume() {
pm.hdr.Set("Micro-Topic", record.Topic) pm.hdr.Set("Micro-Topic", record.Topic)
pm.hdr.Set("Micro-Key", string(record.Key)) pm.hdr.Set("Micro-Key", string(record.Key))
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10)) pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
switch h := pc.handler.(type) {
case func(broker.Message) error: processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle
err = h(pm) errChan := make(chan error, 1)
case func([]broker.Message) error:
err = h([]broker.Message{pm}) go func() {
switch h := pc.handler.(type) {
case func(broker.Message) error:
errChan <- h(pm)
case func([]broker.Message) error:
errChan <- h([]broker.Message{pm})
}
}()
select {
case err = <-errChan:
case <-processCtx.Done():
err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)
} }
cancel()
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
if err != nil { if err != nil {
@@ -269,7 +307,7 @@ func (pc *consumer) consume() {
ack := pm.ack ack := pm.ack
if pc.messagePool { if pc.messagePool {
messagePool.Put(p) messagePool.Put(pm)
} }
if ack { if ack {
pc.c.MarkCommitRecords(record) pc.c.MarkCommitRecords(record)
@@ -277,8 +315,7 @@ func (pc *consumer) consume() {
if sp != nil { if sp != nil {
sp.Finish() sp.Finish()
} }
// pc.connected.Store(0) pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited")
return return
} }
if sp != nil { if sp != nil {