diff --git a/subscriber.go b/subscriber.go index 331290e..7e1376a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -129,9 +129,9 @@ func (s *Subscriber) poll(ctx context.Context) { fetches.EachPartition(func(p kgo.FetchTopicPartition) { tps := tp{p.Topic, p.Partition} - s.mu.Lock() + s.mu.RLock() c := s.consumers[tps] - s.mu.Unlock() + s.mu.RUnlock() if c != nil { c.recs <- p } @@ -150,19 +150,36 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) tps := tp{topic, partition} s.mu.Lock() pc, ok := s.consumers[tps] + if ok { + delete(s.consumers, tps) + } s.mu.Unlock() if !ok || pc == nil { continue } - s.mu.Lock() - delete(s.consumers, tps) - s.mu.Unlock() - close(pc.quit) + if s.kopts.Logger.V(logger.DebugLevel) { - s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition)) } + + close(pc.quit) + wg.Add(1) - go func() { <-pc.done; wg.Done() }() + go func(c *consumer, t string, p int32) { + defer wg.Done() + + timeout := time.NewTimer(30 * time.Second) //waiting stop consumer mb set to opts/cfg TimeoutWaitKillConsumer + defer timeout.Stop() + + select { + case <-c.done: + if s.kopts.Logger.V(logger.DebugLevel) { + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p)) + } + case <-timeout.C: + s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p)) + } + }(pc, topic, partition) } } } @@ -197,7 +214,6 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked)) } - s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { s.mu.Lock() tpc := make(map[tp]*consumer, len(s.consumers)) @@ -209,6 +225,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str } } } + s.killConsumers(ctx, revoked) } func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { @@ -251,7 +268,10 @@ func (pc *consumer) consume() { select { case <-pc.quit: return - case p := <-pc.recs: + case p, ok := <-pc.recs: + if !ok { + return + } if p.Err != nil || p.FetchPartition.Err != nil { if p.Err != nil { @@ -274,6 +294,11 @@ func (pc *consumer) consume() { } for _, record := range p.Records { + select { + case <-pc.quit: + return + default: + } ctx, sp := pc.htracer.WithProcessSpan(record) ts := time.Now() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() @@ -300,12 +325,24 @@ func (pc *consumer) consume() { pm.hdr.Set("Micro-Key", string(record.Key)) pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10)) - switch h := pc.handler.(type) { - case func(broker.Message) error: - err = h(pm) - case func([]broker.Message) error: - err = h([]broker.Message{pm}) + processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle + errChan := make(chan error, 1) + + go func() { + switch h := pc.handler.(type) { + case func(broker.Message) error: + errChan <- h(pm) + case func([]broker.Message) error: + errChan <- h([]broker.Message{pm}) + } + }() + + select { + case err = <-errChan: + case <-processCtx.Done(): + err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset) } + cancel() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() if err != nil { @@ -335,6 +372,8 @@ func (pc *consumer) consume() { continue } + pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)) + pm := pc.newErrorMessage(ErrLostMessage, p.Topic, p.Partition) switch h := pc.handler.(type) { case func(broker.Message) error: