diff --git a/kgo.go b/kgo.go index ce59452..2192206 100644 --- a/kgo.go +++ b/kgo.go @@ -344,40 +344,31 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker } func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error { - records := make([]*kgo.Record, 0, len(messages)) - var errs []string - var key []byte - var promise func(*kgo.Record, error) + var records []*kgo.Record for _, msg := range messages { - if mctx := msg.Context(); mctx != nil { - if k, ok := mctx.Value(messageKey{}).([]byte); ok && k != nil { - key = k - } - if p, ok := mctx.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { - promise = p - } - } rec := &kgo.Record{ - Context: ctx, - Key: key, + Context: msg.Context(), Topic: topic, Value: msg.Body(), } - b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() + var promise func(*kgo.Record, error) + if rec.Context != nil { + if k, ok := rec.Context.Value(messageKey{}).([]byte); ok && k != nil { + rec.Key = k + } + if p, ok := rec.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil { + promise = p + } + } setHeaders(rec, msg.Header()) - records = append(records, rec) - } - - ts := time.Now() - - if promise != nil { - - for _, rec := range records { + if promise != nil { + ts := time.Now() + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc() b.c.Produce(ctx, rec, func(r *kgo.Record, err error) { te := time.Since(ts) b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec() @@ -390,27 +381,33 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M } promise(r, err) }) - } - return nil - } - - results := b.c.ProduceSync(ctx, records...) - - te := time.Since(ts) - for _, result := range results { - b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) - b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) - b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec() - if result.Err != nil { - b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() - errs = append(errs, result.Err.Error()) + continue } else { - b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() + records = append(records, rec) } } - if len(errs) > 0 { - return fmt.Errorf("publish error: %s", strings.Join(errs, "\n")) + if len(records) > 0 { + var errs []string + ts := time.Now() + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Set(uint64(len(records))) + results := b.c.ProduceSync(ctx, records...) + te := time.Since(ts) + for _, result := range results { + b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds()) + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec() + if result.Err != nil { + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc() + errs = append(errs, result.Err.Error()) + } else { + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc() + } + } + + if len(errs) > 0 { + return fmt.Errorf("publish error: %s", strings.Join(errs, "\n")) + } } return nil diff --git a/kgo_test.go b/kgo_test.go index d4e3ab5..0bfb552 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -81,7 +81,7 @@ func TestFail(t *testing.T) { go func() { for _, msg := range msgs { // t.Logf("broker publish") - if err := b.Publish(ctx, "test", msg); err != nil { + if err := b.Publish(ctx, "test.fail", msg); err != nil { t.Fatal(err) } } @@ -96,7 +96,7 @@ func TestFail(t *testing.T) { return msg.Ack() } - sub, err := b.Subscribe(ctx, "test", fn, + sub, err := b.Subscribe(ctx, "test.fail", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup(group), broker.SubscribeBodyOnly(true)) @@ -184,7 +184,7 @@ func TestPubSub(t *testing.T) { msgs = append(msgs, m) } - if err := b.Publish(ctx, "test", msgs...); err != nil { + if err := b.Publish(ctx, "test.pubsub", msgs...); err != nil { t.Fatal(err) } // t.Skip() @@ -197,7 +197,7 @@ func TestPubSub(t *testing.T) { return msg.Ack() } - sub, err := b.Subscribe(ctx, "test", fn, + sub, err := b.Subscribe(ctx, "test.pubsub", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup(group), broker.SubscribeBodyOnly(true)) diff --git a/options.go b/options.go index b7f8d98..4f7eae3 100644 --- a/options.go +++ b/options.go @@ -116,10 +116,3 @@ type subscribeMessagePoolKey struct{} func SubscribeMessagePool(b bool) broker.SubscribeOption { return broker.SetSubscribeOption(subscribeMessagePoolKey{}, b) } - -type subscribeMessagePoolKey struct{} - -// SubscribeMessagePool optionaly enabled/disable message pool -func SubscribeMessagePool(b bool) broker.SubscribeOption { - return broker.SetSubscribeOption(subscribeMessagePoolKey{}, b) -} diff --git a/subscriber.go b/subscriber.go index e7557b8..c61854a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -39,7 +39,6 @@ type consumer struct { } type Subscriber struct { -<<<<<<< HEAD consumers map[tp]*consumer c *kgo.Client htracer *hookTracer