From acb7fd2b11a2ef5c7a4eade710d6ae79d748c460 Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Fri, 30 Jan 2026 09:52:41 +0300 Subject: [PATCH] correction create opts --- kgo.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/kgo.go b/kgo.go index c7b645f..ba243e7 100644 --- a/kgo.go +++ b/kgo.go @@ -568,19 +568,22 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac messagePool: messagePool, } - kopts := append(b.kopts, - kgo.ConsumerGroup(options.Group), - kgo.ConsumeTopics(topic), - kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), - kgo.FetchMaxWait(1*time.Second), - kgo.AutoCommitInterval(commitInterval), - kgo.OnPartitionsAssigned(sub.assigned), - kgo.OnPartitionsRevoked(sub.revoked), - kgo.StopProducerOnDataLossDetected(), - kgo.OnPartitionsLost(sub.lost), - kgo.AutoCommitCallback(sub.autocommit), - kgo.AutoCommitMarks(), - kgo.WithHooks(sub), + kopts := append( + []kgo.Opt{ + kgo.ConsumerGroup(options.Group), + kgo.ConsumeTopics(topic), + kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + kgo.FetchMaxWait(1 * time.Second), + kgo.AutoCommitInterval(commitInterval), + kgo.OnPartitionsAssigned(sub.assigned), + kgo.OnPartitionsRevoked(sub.revoked), + kgo.StopProducerOnDataLossDetected(), + kgo.OnPartitionsLost(sub.lost), + kgo.AutoCommitCallback(sub.autocommit), + kgo.AutoCommitMarks(), + kgo.WithHooks(sub), + }, + b.kopts..., ) if options.Context != nil { -- 2.49.1