diff --git a/kgo.go b/kgo.go index ef0b650..0197d9a 100644 --- a/kgo.go +++ b/kgo.go @@ -196,8 +196,10 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { k.RLock() - if !k.connected { - k.RUnlock() + ok := k.connected + k.RUnlock() + + if !ok { k.Lock() c, err := k.connect(ctx, k.kopts...) if err != nil { @@ -208,7 +210,6 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.connected = true k.Unlock() } - k.RUnlock() options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs))