diff --git a/kgo.go b/kgo.go index 90c87b6..0e990f0 100644 --- a/kgo.go +++ b/kgo.go @@ -267,7 +267,7 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br k.connected.Store(1) } k.Unlock() - fmt.Printf("EEE\n") + options := broker.NewPublishOptions(opts...) records := make([]*kgo.Record, 0, len(msgs)) var errs []string @@ -322,9 +322,9 @@ func (k *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...br return nil } ts := time.Now() - fmt.Printf("SSSSSSEEE\n") + results := k.c.ProduceSync(ctx, records...) - fmt.Printf("SSSSSS\n") + te := time.Since(ts) for _, result := range results { k.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) diff --git a/kgo_test.go b/kgo_test.go index 7f09b05..a1955dd 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -2,7 +2,6 @@ package kgo_test import ( "context" - "fmt" "os" "strings" "sync/atomic" @@ -214,7 +213,7 @@ func TestPubSub(t *testing.T) { if prc := atomic.LoadInt64(&idx); prc == msgcnt { close(done) } else { - fmt.Printf("processed %v\n", prc) + t.Logf("processed %v\n", prc) } case <-ticker.C: close(done)