From 28cb71af467f691dd66462355819b6a5949c32b6 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 27 Nov 2025 13:12:57 +0300 Subject: [PATCH 1/3] fix consume/killconsume --- subscriber.go | 73 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 55 insertions(+), 18 deletions(-) diff --git a/subscriber.go b/subscriber.go index 5a7b300..97d6441 100644 --- a/subscriber.go +++ b/subscriber.go @@ -116,9 +116,9 @@ func (s *Subscriber) poll(ctx context.Context) { fetches.EachPartition(func(p kgo.FetchTopicPartition) { tps := tp{p.Topic, p.Partition} - s.mu.Lock() + s.mu.RLock() c := s.consumers[tps] - s.mu.Unlock() + s.mu.RUnlock() if c != nil { c.recs <- p } @@ -137,19 +137,36 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) tps := tp{topic, partition} s.mu.Lock() pc, ok := s.consumers[tps] + if ok { + delete(s.consumers, tps) + } s.mu.Unlock() if !ok { continue } - s.mu.Lock() - delete(s.consumers, tps) - s.mu.Unlock() - close(pc.quit) + if s.kopts.Logger.V(logger.DebugLevel) { - s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition)) } + + close(pc.quit) + wg.Add(1) - go func() { <-pc.done; wg.Done() }() + go func(c *consumer, t string, p int32) { + defer wg.Done() + + timeout := time.NewTimer(30 * time.Second) //waiting stop consumer mb set to opts/cfg TimeoutWaitKillConsumer + defer timeout.Stop() + + select { + case <-c.done: + if s.kopts.Logger.V(logger.DebugLevel) { + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p)) + } + case <-timeout.C: + s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p)) + } + }(pc, topic, partition) } } } @@ -175,11 +192,11 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked)) } - s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) // s.connected.Store(0) } + s.killConsumers(ctx, revoked) } func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { @@ -222,8 +239,16 @@ func (pc *consumer) consume() { select { case <-pc.quit: return - case p := <-pc.recs: + case p, ok := <-pc.recs: + if !ok { + return + } for _, record := range p.Records { + select { + case <-pc.quit: + return + default: + } ctx, sp := pc.htracer.WithProcessSpan(record) ts := time.Now() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() @@ -246,12 +271,25 @@ func (pc *consumer) consume() { pm.hdr.Set("Micro-Topic", record.Topic) pm.hdr.Set("Micro-Key", string(record.Key)) pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10)) - switch h := pc.handler.(type) { - case func(broker.Message) error: - err = h(pm) - case func([]broker.Message) error: - err = h([]broker.Message{pm}) + + processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle + errChan := make(chan error, 1) + + go func() { + switch h := pc.handler.(type) { + case func(broker.Message) error: + errChan <- h(pm) + case func([]broker.Message) error: + errChan <- h([]broker.Message{pm}) + } + }() + + select { + case err = <-errChan: + case <-processCtx.Done(): + err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset) } + cancel() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() if err != nil { @@ -269,7 +307,7 @@ func (pc *consumer) consume() { ack := pm.ack if pc.messagePool { - messagePool.Put(p) + messagePool.Put(pm) } if ack { pc.c.MarkCommitRecords(record) @@ -277,8 +315,7 @@ func (pc *consumer) consume() { if sp != nil { sp.Finish() } - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited") + pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)) return } if sp != nil { -- 2.49.1 From ea7ac4378ad8cfa7e0f999e193d033a8a3d379f9 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Thu, 27 Nov 2025 13:12:57 +0300 Subject: [PATCH 2/3] fix consume/killconsume --- subscriber.go | 69 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 15 deletions(-) diff --git a/subscriber.go b/subscriber.go index 331290e..7e1376a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -129,9 +129,9 @@ func (s *Subscriber) poll(ctx context.Context) { fetches.EachPartition(func(p kgo.FetchTopicPartition) { tps := tp{p.Topic, p.Partition} - s.mu.Lock() + s.mu.RLock() c := s.consumers[tps] - s.mu.Unlock() + s.mu.RUnlock() if c != nil { c.recs <- p } @@ -150,19 +150,36 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) tps := tp{topic, partition} s.mu.Lock() pc, ok := s.consumers[tps] + if ok { + delete(s.consumers, tps) + } s.mu.Unlock() if !ok || pc == nil { continue } - s.mu.Lock() - delete(s.consumers, tps) - s.mu.Unlock() - close(pc.quit) + if s.kopts.Logger.V(logger.DebugLevel) { - s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] waiting for work to finish topic %s partition %d", topic, partition)) + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] killing consumer topic %s partition %d", topic, partition)) } + + close(pc.quit) + wg.Add(1) - go func() { <-pc.done; wg.Done() }() + go func(c *consumer, t string, p int32) { + defer wg.Done() + + timeout := time.NewTimer(30 * time.Second) //waiting stop consumer mb set to opts/cfg TimeoutWaitKillConsumer + defer timeout.Stop() + + select { + case <-c.done: + if s.kopts.Logger.V(logger.DebugLevel) { + s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] consumer stopped topic %s partition %d", t, p)) + } + case <-timeout.C: + s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] timeout waiting for consumer topic %s partition %d", t, p)) + } + }(pc, topic, partition) } } } @@ -197,7 +214,6 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str if s.kopts.Logger.V(logger.DebugLevel) { s.kopts.Logger.Debug(ctx, fmt.Sprintf("[kgo] revoked %#+v", revoked)) } - s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { s.mu.Lock() tpc := make(map[tp]*consumer, len(s.consumers)) @@ -209,6 +225,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str } } } + s.killConsumers(ctx, revoked) } func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[string][]int32) { @@ -251,7 +268,10 @@ func (pc *consumer) consume() { select { case <-pc.quit: return - case p := <-pc.recs: + case p, ok := <-pc.recs: + if !ok { + return + } if p.Err != nil || p.FetchPartition.Err != nil { if p.Err != nil { @@ -274,6 +294,11 @@ func (pc *consumer) consume() { } for _, record := range p.Records { + select { + case <-pc.quit: + return + default: + } ctx, sp := pc.htracer.WithProcessSpan(record) ts := time.Now() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Inc() @@ -300,12 +325,24 @@ func (pc *consumer) consume() { pm.hdr.Set("Micro-Key", string(record.Key)) pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10)) - switch h := pc.handler.(type) { - case func(broker.Message) error: - err = h(pm) - case func([]broker.Message) error: - err = h([]broker.Message{pm}) + processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) //waiting process consumer mb set to opts/cfg TimeoutProccesWaitingHandle + errChan := make(chan error, 1) + + go func() { + switch h := pc.handler.(type) { + case func(broker.Message) error: + errChan <- h(pm) + case func([]broker.Message) error: + errChan <- h([]broker.Message{pm}) + } + }() + + select { + case err = <-errChan: + case <-processCtx.Done(): + err = fmt.Errorf("[kgo] message processing timeout topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset) } + cancel() pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() if err != nil { @@ -335,6 +372,8 @@ func (pc *consumer) consume() { continue } + pc.kopts.Logger.Error(pc.kopts.Context, fmt.Sprintf("[kgo] message not acknowledged topic %s partition %d offset %d", record.Topic, record.Partition, record.Offset)) + pm := pc.newErrorMessage(ErrLostMessage, p.Topic, p.Partition) switch h := pc.handler.(type) { case func(broker.Message) error: -- 2.49.1 From bd97d3416eba2a3756b4c8032450aef2f9680d01 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Fri, 5 Dec 2025 15:38:47 +0300 Subject: [PATCH 3/3] removed old version golangcylint.yaml --- .golangci.yml | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 .golangci.yml diff --git a/.golangci.yml b/.golangci.yml deleted file mode 100644 index c6a7985..0000000 --- a/.golangci.yml +++ /dev/null @@ -1,5 +0,0 @@ -run: - concurrency: 8 - timeout: 5m - issues-exit-code: 1 - tests: true -- 2.49.1