From d6e73a3419e3df0747ac8de88eec261a7a1ac4a0 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 21 May 2025 09:54:16 +0300 Subject: [PATCH] minimize checks on publish Signed-off-by: Vasiliy Tolstov --- kgo.go | 11 ----------- kgo_test.go | 2 +- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/kgo.go b/kgo.go index dcb9dfa..c87ea02 100644 --- a/kgo.go +++ b/kgo.go @@ -344,23 +344,12 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker } func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error { - if b.connected.Load() == 0 { - c, _, err := b.connect(ctx, b.kopts...) - if err != nil { - return err - } - b.Lock() - b.c = c - b.Unlock() - } - records := make([]*kgo.Record, 0, len(messages)) var errs []string var key []byte var promise func(*kgo.Record, error) for _, msg := range messages { - if mctx := msg.Context(); mctx != nil { if k, ok := mctx.Value(publishKey{}).([]byte); ok && k != nil { key = k diff --git a/kgo_test.go b/kgo_test.go index fd549d7..d4e3ab5 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -222,7 +222,7 @@ func TestPubSub(t *testing.T) { if prc := atomic.LoadInt64(&idx); prc == msgcnt { close(done) } else { - t.Logf("processed %v\n", prc) + t.Logf("processed %v of %v\n", prc, msgcnt) } case <-ticker.C: close(done)