Compare commits

...

2 Commits

Author SHA1 Message Date
8a069e9faf Merge pull request 'correction create opts' (#159) from devstigneev/micro-broker-kgo:v3_fix_opts into v3
Some checks failed
test / test (push) Failing after 17m33s
coverage / build (push) Failing after 17m44s
Reviewed-on: #159
2026-01-30 09:48:26 +03:00
Evstigneev Denis
79d80a6fc8 correction create opts
Some checks failed
coverage / build (pull_request) Failing after 3m48s
test / test (pull_request) Failing after 3m40s
lint / lint (pull_request) Failing after 4m2s
2026-01-30 09:45:00 +03:00

29
kgo.go
View File

@@ -477,19 +477,22 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
connected: k.connected,
}
kopts := append(k.kopts,
kgo.ConsumerGroup(options.Group),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.FetchMaxWait(1*time.Second),
kgo.AutoCommitInterval(commitInterval),
kgo.OnPartitionsAssigned(sub.assigned),
kgo.OnPartitionsRevoked(sub.revoked),
kgo.StopProducerOnDataLossDetected(),
kgo.OnPartitionsLost(sub.lost),
kgo.AutoCommitCallback(sub.autocommit),
kgo.AutoCommitMarks(),
kgo.WithHooks(sub),
kopts := append(
[]kgo.Opt{
kgo.ConsumerGroup(options.Group),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.FetchMaxWait(1 * time.Second),
kgo.AutoCommitInterval(commitInterval),
kgo.OnPartitionsAssigned(sub.assigned),
kgo.OnPartitionsRevoked(sub.revoked),
kgo.StopProducerOnDataLossDetected(),
kgo.OnPartitionsLost(sub.lost),
kgo.AutoCommitCallback(sub.autocommit),
kgo.AutoCommitMarks(),
kgo.WithHooks(sub),
},
k.kopts...,
)
if options.Context != nil {