fixup panic
Some checks failed
codeql / analyze (go) (pull_request) Failing after 2m42s
prbuild / test (pull_request) Failing after 1m29s
prbuild / lint (pull_request) Failing after 2m37s
autoapprove / autoapprove (pull_request) Failing after 1m24s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
Some checks failed
codeql / analyze (go) (pull_request) Failing after 2m42s
prbuild / test (pull_request) Failing after 1m29s
prbuild / lint (pull_request) Failing after 2m37s
autoapprove / autoapprove (pull_request) Failing after 1m24s
automerge / automerge (pull_request) Failing after 4s
dependabot-automerge / automerge (pull_request) Has been skipped
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
7
kgo.go
7
kgo.go
@@ -197,8 +197,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
|
||||
|
||||
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
k.RLock()
|
||||
if !k.connected {
|
||||
k.RUnlock()
|
||||
ok := k.connected
|
||||
k.RUnlock()
|
||||
|
||||
if !ok {
|
||||
k.Lock()
|
||||
c, err := k.connect(ctx, k.kopts...)
|
||||
if err != nil {
|
||||
@@ -209,7 +211,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br
|
||||
k.connected = true
|
||||
k.Unlock()
|
||||
}
|
||||
k.RUnlock()
|
||||
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
records := make([]*kgo.Record, 0, len(msgs))
|
||||
|
Reference in New Issue
Block a user