fix consume/killconsume

This commit is contained in:
Evstigneev Denis
2025-11-27 13:12:57 +03:00
parent 5080a52834
commit ea7ac4378a

View File

@@ -129,9 +129,9 @@ func (s *Subscriber) poll(ctx context.Context) {
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
tps := tp{p.Topic, p.Partition}
s.mu.Lock()
s.mu.RLock()
c := s.consumers[tps]
s.mu.Unlock()
s.mu.RUnlock()
if c != nil {
c.recs <- p
}
@@ -150,19 +150,36 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
tps := tp{topic, partition}
s.mu.Lock()
pc, ok := s.consumers[tps]
if ok {
delete(s.consumers, tps)
}
s.mu.Unlock()
if !ok || pc == nil {
continue
}
s.mu.Lock()
delete(s.consumers, tps)
s.mu.Unlock()
close(pc.quit)
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition))
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition))
}
close(pc.quit)
wg.Add(1)
go func() { <-pc.done; wg.Done() }()
go func(c *consumer, t string, p int32) {
defer wg.Done()
timeout := time.NewTimer(30 * time.Second) //waiting stop consumer mb set to opts/cfg TimeoutWaitKillConsumer
defer timeout.Stop()
select {
case <-c.done:
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p))
}
case <-timeout.C:
s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p))
}
}(pc, topic, partition)
}
}
}
@@ -197,7 +214,6 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
if s.kopts.Logger.V(logger.DebugLevel) {
s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked))
}
s.killConsumers(ctx, revoked)
if err := c.CommitMarkedOffsets(ctx); err != nil {
s.mu.Lock()
tpc := make(map[tp]*consumer, len(s.consumers))
@@ -209,6 +225,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
}
}
}
s.killConsumers(ctx, revoked)
}
func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) {
@@ -251,7 +268,10 @@ func (pc *consumer) consume() {
select {
case <-pc.quit:
return
case p := <-pc.recs:
case p, ok := <-pc.recs:
if !ok {
return
}
if p.Err != nil || p.FetchPartition.Err != nil {
if p.Err != nil {
@@ -274,6 +294,11 @@ func (pc *consumer) consume() {
}
for _, record := range p.Records {
select {
case <-pc.quit:
return
default:
}
ctx, sp := pc.htracer.WithProcessSpan(record)
ts := time.Now()
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc()
@@ -300,12 +325,24 @@ func (pc *consumer) consume() {
pm.hdr.Set("Micro-Key", string(record.Key))
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle
errChan := make(chan error, 1)
go func() {
switch h := pc.handler.(type) {
case func(broker.Message) error:
err = h(pm)
errChan <- h(pm)
case func([]broker.Message) error:
err = h([]broker.Message{pm})
errChan <- h([]broker.Message{pm})
}
}()
select {
case err = <-errChan:
case <-processCtx.Done():
err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)
}
cancel()
pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec()
if err != nil {
@@ -335,6 +372,8 @@ func (pc *consumer) consume() {
continue
}
pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset))
pm := pc.newErrorMessage(ErrLostMessage, p.Topic, p.Partition)
switch h := pc.handler.(type) {
case func(broker.Message) error: