add copied metadata && update deps
Some checks failed
automerge / automerge (pull_request) Has been skipped
autoapprove / autoapprove (pull_request) Successful in 5s
dependabot-automerge / automerge (pull_request) Has been skipped
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled

This commit is contained in:
2024-11-21 22:15:28 +03:00
parent aedd60ea87
commit ba9b88c650
4 changed files with 47 additions and 120 deletions

View File

@@ -52,10 +52,12 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
r.Context = context.Background()
}
md, ok := metadata.FromOutgoingContext(r.Context)
omd, ok := metadata.FromOutgoingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
omd = metadata.New(len(r.Headers))
}
md := metadata.Copy(omd)
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
@@ -66,9 +68,7 @@ func (m *hookTracer) OnProduceRecordBuffered(r *kgo.Record) {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
}
md, _ = metadata.FromOutgoingContext(r.Context)
setHeaders(r, md)
setHeaders(r, omd)
}
// OnProduceRecordUnbuffered continues and ends the "publish" span for an
@@ -119,10 +119,12 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
if r.Context == nil {
r.Context = context.Background()
}
md, ok := metadata.FromIncomingContext(r.Context)
omd, ok := metadata.FromIncomingContext(r.Context)
if !ok {
md = metadata.New(len(r.Headers))
omd = metadata.New(len(r.Headers))
}
md := metadata.Copy(omd)
for _, h := range r.Headers {
md.Set(h.Key, string(h.Value))
}
@@ -133,9 +135,7 @@ func (m *hookTracer) OnFetchRecordBuffered(r *kgo.Record) {
r.Context, _ = m.tracer.Start(r.Context, "sdk.broker", opts...)
}
md, _ = metadata.FromIncomingContext(r.Context)
setHeaders(r, md)
setHeaders(r, omd)
}
// OnFetchRecordUnbuffered continues and ends the "receive" span for an