added check span!=nil && conn close #154

Merged
vtolstov merged 2 commits from devstigneev/micro-broker-kgo:v3 into v3 2025-05-21 11:56:48 +03:00
Showing only changes of commit 397f5414b3 - Show all commits

13
kgo.go
View File

@@ -280,19 +280,6 @@ func (k *Broker) Publish(ctx context.Context, topic string, msg *broker.Message,
} }
func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
k.Lock()
if k.connected.Load() == 0 {
c, _, err := k.connect(ctx, k.kopts...)
if err != nil {
k.Unlock()
return err
}
k.c.Close()
k.c = c
k.connected.Store(1)
}
k.Unlock()
options := broker.NewPublishOptions(opts...) options := broker.NewPublishOptions(opts...)
records := make([]*kgo.Record, 0, len(msgs)) records := make([]*kgo.Record, 0, len(msgs))
var errs []string var errs []string