correction create opts #159
5
kgo.go
5
kgo.go
@@ -477,7 +477,8 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
||||
connected: k.connected,
|
||||
}
|
||||
|
||||
kopts := append(k.kopts,
|
||||
kopts := append(
|
||||
[]kgo.Opt{
|
||||
kgo.ConsumerGroup(options.Group),
|
||||
kgo.ConsumeTopics(topic),
|
||||
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
||||
@@ -490,6 +491,8 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
||||
kgo.AutoCommitCallback(sub.autocommit),
|
||||
kgo.AutoCommitMarks(),
|
||||
kgo.WithHooks(sub),
|
||||
},
|
||||
k.kopts...,
|
||||
)
|
||||
|
||||
if options.Context != nil {
|
||||
|
||||
Reference in New Issue
Block a user