add additional metadata to message, closes #194
Some checks failed
sync / sync (push) Failing after 12m3s
coverage / build (push) Failing after 13m41s
test / test (push) Successful in 4m0s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-07-15 15:59:05 +03:00
parent cced3b84c1
commit 63d82be92a

View File

@@ -271,7 +271,11 @@ func (pc *consumer) consume() {
for _, hdr := range record.Headers {
pm.hdr.Set(hdr.Key, string(hdr.Value))
}
pm.hdr.Set("Micro-Offset", strconv.FormatInt(record.Offset, 10))
pm.hdr.Set("Micro-Partition", strconv.FormatInt(int64(record.Partition), 10))
pm.hdr.Set("Micro-Topic", record.Topic)
pm.hdr.Set("Micro-Key", string(record.Key))
pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10))
switch h := pc.handler.(type) {
case func(broker.Message) error:
err = h(pm)