Merge pull request 'fixup md' (#131) from pubmdfix into v3
Some checks failed
build / test (push) Failing after 1m28s
build / lint (push) Failing after 2m36s
codeql / analyze (go) (push) Failing after 2m50s

Reviewed-on: #131
This commit is contained in:
Василий Толстов 2023-12-21 00:17:46 +03:00
commit 9d70c4dd34

19
grpc.go
View File

@ -718,6 +718,10 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
if v, ok := os.LookupEnv("MICRO_PROXY"); ok { if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v exchange = v
} }
// get the exchange
if len(options.Exchange) > 0 {
exchange = options.Exchange
}
msgs := make([]*broker.Message, 0, len(ps)) msgs := make([]*broker.Message, 0, len(ps))
@ -729,7 +733,11 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
for _, p := range ps { for _, p := range ps {
md := metadata.Copy(omd) md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType() md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
md.Set(metadata.HeaderTopic, topic)
iter := p.Metadata().Iterator() iter := p.Metadata().Iterator()
var k, v string var k, v string
for iter.Next(&k, &v) { for iter.Next(&k, &v) {
@ -753,15 +761,6 @@ func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...c
body = b body = b
} }
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
for k, v := range p.Metadata() {
md.Set(k, v)
}
md.Set(metadata.HeaderTopic, topic)
msgs = append(msgs, &broker.Message{Header: md, Body: body}) msgs = append(msgs, &broker.Message{Header: md, Body: body})
} }