fixup md #116

Merged
vtolstov merged 1 commits from pubmdfix into v3 2023-12-21 00:16:23 +03:00

19
http.go
View File

@ -637,6 +637,10 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
if v, ok := os.LookupEnv("MICRO_PROXY"); ok { if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v exchange = v
} }
// get the exchange
if len(options.Exchange) > 0 {
exchange = options.Exchange
}
omd, ok := metadata.FromOutgoingContext(ctx) omd, ok := metadata.FromOutgoingContext(ctx)
if !ok { if !ok {
@ -648,7 +652,11 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
for _, p := range ps { for _, p := range ps {
md := metadata.Copy(omd) md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType() md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
md.Set(metadata.HeaderTopic, topic)
iter := p.Metadata().Iterator() iter := p.Metadata().Iterator()
var k, v string var k, v string
for iter.Next(&k, &v) { for iter.Next(&k, &v) {
@ -672,15 +680,6 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c
body = b body = b
} }
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
for k, v := range p.Metadata() {
md.Set(k, v)
}
md.Set(metadata.HeaderTopic, topic)
msgs = append(msgs, &broker.Message{Header: md, Body: body}) msgs = append(msgs, &broker.Message{Header: md, Body: body})
} }