diff --git a/subscriber.go b/subscriber.go index c61854a..cde2f68 100644 --- a/subscriber.go +++ b/subscriber.go @@ -271,7 +271,11 @@ func (pc *consumer) consume() { for _, hdr := range record.Headers { pm.hdr.Set(hdr.Key, string(hdr.Value)) } - + pm.hdr.Set("Micro-Offset", strconv.FormatInt(record.Offset, 10)) + pm.hdr.Set("Micro-Partition", strconv.FormatInt(int64(record.Partition), 10)) + pm.hdr.Set("Micro-Topic", record.Topic) + pm.hdr.Set("Micro-Key", string(record.Key)) + pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10)) switch h := pc.handler.(type) { case func(broker.Message) error: err = h(pm)