From 79d80a6fc87f0cd1d9642300950deedc7e17691d Mon Sep 17 00:00:00 2001 From: Evstigneev Denis Date: Fri, 30 Jan 2026 09:45:00 +0300 Subject: [PATCH] correction create opts --- kgo.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/kgo.go b/kgo.go index d34012a..507b8f7 100644 --- a/kgo.go +++ b/kgo.go @@ -477,19 +477,22 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han connected: k.connected, } - kopts := append(k.kopts, - kgo.ConsumerGroup(options.Group), - kgo.ConsumeTopics(topic), - kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), - kgo.FetchMaxWait(1*time.Second), - kgo.AutoCommitInterval(commitInterval), - kgo.OnPartitionsAssigned(sub.assigned), - kgo.OnPartitionsRevoked(sub.revoked), - kgo.StopProducerOnDataLossDetected(), - kgo.OnPartitionsLost(sub.lost), - kgo.AutoCommitCallback(sub.autocommit), - kgo.AutoCommitMarks(), - kgo.WithHooks(sub), + kopts := append( + []kgo.Opt{ + kgo.ConsumerGroup(options.Group), + kgo.ConsumeTopics(topic), + kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + kgo.FetchMaxWait(1 * time.Second), + kgo.AutoCommitInterval(commitInterval), + kgo.OnPartitionsAssigned(sub.assigned), + kgo.OnPartitionsRevoked(sub.revoked), + kgo.StopProducerOnDataLossDetected(), + kgo.OnPartitionsLost(sub.lost), + kgo.AutoCommitCallback(sub.autocommit), + kgo.AutoCommitMarks(), + kgo.WithHooks(sub), + }, + k.kopts..., ) if options.Context != nil { -- 2.49.1