corrected by comments
Some checks failed
test / test (pull_request) Failing after 14m30s
lint / lint (pull_request) Failing after 14m42s
coverage / build (pull_request) Failing after 14m48s

This commit is contained in:
Evstigneev Denis
2025-12-10 15:29:55 +03:00
parent bd97d3416e
commit 2919dd5994
3 changed files with 20 additions and 27 deletions

View File

@@ -168,7 +168,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
go func(c *consumer, t string, p int32) {
defer wg.Done()
timeout := time.NewTimer(30 * time.Second) //waiting stop consumer mb set to opts/cfg TimeoutWaitKillConsumer
timeout := time.NewTimer(s.kopts.GracefulTimeout)
defer timeout.Stop()
select {
@@ -177,7 +177,9 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
}
case <-timeout.C:
s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
}
}
}(pc, topic, partition)
}
@@ -325,7 +327,7 @@ func (pc *consumer) consume() {
pm.hdr.Set("Micro-Key", string(record.Key))
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle
processCtx, cancel := context.WithTimeout(ctx, pc.kopts.GracefulTimeout)
errChan := make(chan error, 1)
go func() {
@@ -340,7 +342,8 @@ func (pc *consumer) consume() {
select {
case err = <-errChan:
case <-processCtx.Done():
err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)
//err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)
err = processCtx.Err()
}
cancel()
@@ -372,7 +375,7 @@ func (pc *consumer) consume() {
continue
}
pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))
pc.kopts.Logger.Debug(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))
pm := pc.newErrorMessage(ErrLostMessage, p.Topic, p.Partition)
switch h := pc.handler.(type) {