diff --git a/kgo.go b/kgo.go index a4795dd..6dc893f 100644 --- a/kgo.go +++ b/kgo.go @@ -243,6 +243,12 @@ func (k *kBroker) Publish(ctx context.Context, topic string, msg *broker.Message } func (k *kBroker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + k.RLock() + if !k.connected { + k.RUnlock() + return broker.ErrNotConnected + } + k.RUnlock() options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string