correction create opts #160

Merged
vtolstov merged 1 commits from devstigneev/micro-broker-kgo:fix_opts into v4 2026-01-30 09:58:19 +03:00
Showing only changes of commit acb7fd2b11 - Show all commits

29
kgo.go
View File

@@ -568,19 +568,22 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
messagePool: messagePool,
}
kopts := append(b.kopts,
kgo.ConsumerGroup(options.Group),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.FetchMaxWait(1*time.Second),
kgo.AutoCommitInterval(commitInterval),
kgo.OnPartitionsAssigned(sub.assigned),
kgo.OnPartitionsRevoked(sub.revoked),
kgo.StopProducerOnDataLossDetected(),
kgo.OnPartitionsLost(sub.lost),
kgo.AutoCommitCallback(sub.autocommit),
kgo.AutoCommitMarks(),
kgo.WithHooks(sub),
kopts := append(
[]kgo.Opt{
kgo.ConsumerGroup(options.Group),
kgo.ConsumeTopics(topic),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.FetchMaxWait(1 * time.Second),
kgo.AutoCommitInterval(commitInterval),
kgo.OnPartitionsAssigned(sub.assigned),
kgo.OnPartitionsRevoked(sub.revoked),
kgo.StopProducerOnDataLossDetected(),
kgo.OnPartitionsLost(sub.lost),
kgo.AutoCommitCallback(sub.autocommit),
kgo.AutoCommitMarks(),
kgo.WithHooks(sub),
},
b.kopts...,
)
if options.Context != nil {