Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d0a959611d | |||
|
|
acb7fd2b11 |
29
kgo.go
29
kgo.go
@@ -568,19 +568,22 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
|
||||
messagePool: messagePool,
|
||||
}
|
||||
|
||||
kopts := append(b.kopts,
|
||||
kgo.ConsumerGroup(options.Group),
|
||||
kgo.ConsumeTopics(topic),
|
||||
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
||||
kgo.FetchMaxWait(1*time.Second),
|
||||
kgo.AutoCommitInterval(commitInterval),
|
||||
kgo.OnPartitionsAssigned(sub.assigned),
|
||||
kgo.OnPartitionsRevoked(sub.revoked),
|
||||
kgo.StopProducerOnDataLossDetected(),
|
||||
kgo.OnPartitionsLost(sub.lost),
|
||||
kgo.AutoCommitCallback(sub.autocommit),
|
||||
kgo.AutoCommitMarks(),
|
||||
kgo.WithHooks(sub),
|
||||
kopts := append(
|
||||
[]kgo.Opt{
|
||||
kgo.ConsumerGroup(options.Group),
|
||||
kgo.ConsumeTopics(topic),
|
||||
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
||||
kgo.FetchMaxWait(1 * time.Second),
|
||||
kgo.AutoCommitInterval(commitInterval),
|
||||
kgo.OnPartitionsAssigned(sub.assigned),
|
||||
kgo.OnPartitionsRevoked(sub.revoked),
|
||||
kgo.StopProducerOnDataLossDetected(),
|
||||
kgo.OnPartitionsLost(sub.lost),
|
||||
kgo.AutoCommitCallback(sub.autocommit),
|
||||
kgo.AutoCommitMarks(),
|
||||
kgo.WithHooks(sub),
|
||||
},
|
||||
b.kopts...,
|
||||
)
|
||||
|
||||
if options.Context != nil {
|
||||
|
||||
Reference in New Issue
Block a user