minimize checks on publish
Some checks failed
coverage / build (push) Successful in 4m51s
sync / sync (push) Has been skipped
test / test (push) Failing after 15m31s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-05-21 09:54:16 +03:00
parent 969e459e3d
commit d6e73a3419
2 changed files with 1 additions and 12 deletions

11
kgo.go
View File

@@ -344,23 +344,12 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker
}
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
if b.connected.Load() == 0 {
c, _, err := b.connect(ctx, b.kopts...)
if err != nil {
return err
}
b.Lock()
b.c = c
b.Unlock()
}
records := make([]*kgo.Record, 0, len(messages))
var errs []string
var key []byte
var promise func(*kgo.Record, error)
for _, msg := range messages {
if mctx := msg.Context(); mctx != nil {
if k, ok := mctx.Value(publishKey{}).([]byte); ok && k != nil {
key = k

View File

@@ -222,7 +222,7 @@ func TestPubSub(t *testing.T) {
if prc := atomic.LoadInt64(&idx); prc == msgcnt {
close(done)
} else {
t.Logf("processed %v\n", prc)
t.Logf("processed %v of %v\n", prc, msgcnt)
}
case <-ticker.C:
close(done)