|
|
|
|
@@ -156,7 +156,7 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|
|
|
|
options.ContentType = b.opts.ContentType
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m := &kgoMessage{ctx: ctx, hdr: hdr, opts: options}
|
|
|
|
|
m := &kgoMessage{ctx: ctx, hdr: hdr.Copy(), opts: options}
|
|
|
|
|
c, err := b.newCodec(m.opts.ContentType)
|
|
|
|
|
if err == nil {
|
|
|
|
|
m.body, err = c.Marshal(body)
|
|
|
|
|
@@ -165,6 +165,8 @@ func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body int
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
m.hdr.Set(metadata.HeaderContentType, m.opts.ContentType)
|
|
|
|
|
|
|
|
|
|
return m, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -344,40 +346,44 @@ func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
|
|
|
|
|
records := make([]*kgo.Record, 0, len(messages))
|
|
|
|
|
var errs []string
|
|
|
|
|
var key []byte
|
|
|
|
|
var promise func(*kgo.Record, error)
|
|
|
|
|
var records []*kgo.Record
|
|
|
|
|
|
|
|
|
|
for _, msg := range messages {
|
|
|
|
|
if mctx := msg.Context(); mctx != nil {
|
|
|
|
|
if k, ok := mctx.Value(messageKey{}).([]byte); ok && k != nil {
|
|
|
|
|
key = k
|
|
|
|
|
}
|
|
|
|
|
if p, ok := mctx.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
|
|
|
|
promise = p
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rec := &kgo.Record{
|
|
|
|
|
Context: ctx,
|
|
|
|
|
Key: key,
|
|
|
|
|
Context: msg.Context(),
|
|
|
|
|
Topic: topic,
|
|
|
|
|
Value: msg.Body(),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
|
|
|
|
var promise func(*kgo.Record, error)
|
|
|
|
|
if rec.Context != nil {
|
|
|
|
|
if k, ok := rec.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
|
|
|
|
rec.Key = k
|
|
|
|
|
}
|
|
|
|
|
if p, ok := rec.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
|
|
|
|
promise = p
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
kmsg, ok := msg.(*kgoMessage)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if kmsg.opts.Context != nil {
|
|
|
|
|
if k, ok := kmsg.opts.Context.Value(messageKey{}).([]byte); ok && k != nil {
|
|
|
|
|
rec.Key = k
|
|
|
|
|
}
|
|
|
|
|
if p, ok := kmsg.opts.Context.Value(messagePromiseKey{}).(func(*kgo.Record, error)); ok && p != nil {
|
|
|
|
|
promise = p
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHeaders(rec, msg.Header())
|
|
|
|
|
|
|
|
|
|
records = append(records, rec)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ts := time.Now()
|
|
|
|
|
|
|
|
|
|
if promise != nil {
|
|
|
|
|
|
|
|
|
|
for _, rec := range records {
|
|
|
|
|
if promise != nil {
|
|
|
|
|
ts := time.Now()
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Inc()
|
|
|
|
|
b.c.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
|
|
|
|
te := time.Since(ts)
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", rec.Topic, "topic", rec.Topic).Dec()
|
|
|
|
|
@@ -390,27 +396,33 @@ func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.M
|
|
|
|
|
}
|
|
|
|
|
promise(r, err)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
results := b.c.ProduceSync(ctx, records...)
|
|
|
|
|
|
|
|
|
|
te := time.Since(ts)
|
|
|
|
|
for _, result := range results {
|
|
|
|
|
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
|
|
|
|
b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec()
|
|
|
|
|
if result.Err != nil {
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
|
|
|
|
|
errs = append(errs, result.Err.Error())
|
|
|
|
|
continue
|
|
|
|
|
} else {
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
|
|
|
|
|
records = append(records, rec)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(errs) > 0 {
|
|
|
|
|
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
|
|
|
|
if len(records) > 0 {
|
|
|
|
|
var errs []string
|
|
|
|
|
ts := time.Now()
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Set(uint64(len(records)))
|
|
|
|
|
results := b.c.ProduceSync(ctx, records...)
|
|
|
|
|
te := time.Since(ts)
|
|
|
|
|
for _, result := range results {
|
|
|
|
|
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
|
|
|
|
b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Update(te.Seconds())
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", result.Record.Topic, "topic", result.Record.Topic).Dec()
|
|
|
|
|
if result.Err != nil {
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "failure").Inc()
|
|
|
|
|
errs = append(errs, result.Err.Error())
|
|
|
|
|
} else {
|
|
|
|
|
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", result.Record.Topic, "topic", result.Record.Topic, "status", "success").Inc()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(errs) > 0 {
|
|
|
|
|
return fmt.Errorf("publish error: %s", strings.Join(errs, "\n"))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|