diff --git a/kgo.go b/kgo.go index c7b645f..ba243e7 100644 --- a/kgo.go +++ b/kgo.go @@ -568,19 +568,22 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac messagePool: messagePool, } - kopts := append(b.kopts, - kgo.ConsumerGroup(options.Group), - kgo.ConsumeTopics(topic), - kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), - kgo.FetchMaxWait(1*time.Second), - kgo.AutoCommitInterval(commitInterval), - kgo.OnPartitionsAssigned(sub.assigned), - kgo.OnPartitionsRevoked(sub.revoked), - kgo.StopProducerOnDataLossDetected(), - kgo.OnPartitionsLost(sub.lost), - kgo.AutoCommitCallback(sub.autocommit), - kgo.AutoCommitMarks(), - kgo.WithHooks(sub), + kopts := append( + []kgo.Opt{ + kgo.ConsumerGroup(options.Group), + kgo.ConsumeTopics(topic), + kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + kgo.FetchMaxWait(1 * time.Second), + kgo.AutoCommitInterval(commitInterval), + kgo.OnPartitionsAssigned(sub.assigned), + kgo.OnPartitionsRevoked(sub.revoked), + kgo.StopProducerOnDataLossDetected(), + kgo.OnPartitionsLost(sub.lost), + kgo.AutoCommitCallback(sub.autocommit), + kgo.AutoCommitMarks(), + kgo.WithHooks(sub), + }, + b.kopts..., ) if options.Context != nil {