Compare commits

..

2 Commits

Author SHA1 Message Date
d0a959611d Merge pull request 'correction create opts' (#160) from devstigneev/micro-broker-kgo:fix_opts into v4
Some checks failed
coverage / build (push) Failing after 3m15s
test / test (push) Failing after 17m25s
sync / sync (push) Failing after 35s
Reviewed-on: #160
2026-01-30 09:58:17 +03:00
Evstigneev Denis
acb7fd2b11 correction create opts
Some checks failed
coverage / build (pull_request) Failing after 2m25s
test / test (pull_request) Failing after 4m35s
lint / lint (pull_request) Failing after 4m43s
2026-01-30 09:52:41 +03:00

29
kgo.go
View File

@@ -568,19 +568,22 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
messagePool: messagePool, messagePool: messagePool,
} }
kopts := append(b.kopts, kopts := append(
kgo.ConsumerGroup(options.Group), []kgo.Opt{
kgo.ConsumeTopics(topic), kgo.ConsumerGroup(options.Group),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.ConsumeTopics(topic),
kgo.FetchMaxWait(1*time.Second), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.AutoCommitInterval(commitInterval), kgo.FetchMaxWait(1 * time.Second),
kgo.OnPartitionsAssigned(sub.assigned), kgo.AutoCommitInterval(commitInterval),
kgo.OnPartitionsRevoked(sub.revoked), kgo.OnPartitionsAssigned(sub.assigned),
kgo.StopProducerOnDataLossDetected(), kgo.OnPartitionsRevoked(sub.revoked),
kgo.OnPartitionsLost(sub.lost), kgo.StopProducerOnDataLossDetected(),
kgo.AutoCommitCallback(sub.autocommit), kgo.OnPartitionsLost(sub.lost),
kgo.AutoCommitMarks(), kgo.AutoCommitCallback(sub.autocommit),
kgo.WithHooks(sub), kgo.AutoCommitMarks(),
kgo.WithHooks(sub),
},
b.kopts...,
) )
if options.Context != nil { if options.Context != nil {