From fe764ae5a348d9cf655482c68831c010c9514ac9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 5 Mar 2022 17:10:25 +0300 Subject: [PATCH] parent bce6f8c3e75dfe7844a0458a2a01fb437a9ccc77 author Vasiliy Tolstov 1646489425 +0300 committer Vasiliy Tolstov 1646494543 +0300 add message metadata support Signed-off-by: Vasiliy Tolstov --- .github/workflows/automerge.yml | 2 +- http.go | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/.github/workflows/automerge.yml b/.github/workflows/automerge.yml index 2f12b4f..b0c94b4 100644 --- a/.github/workflows/automerge.yml +++ b/.github/workflows/automerge.yml @@ -1,4 +1,4 @@ -name: "prautomerge" +name: "automerge" on: pull_request_target: diff --git a/http.go b/http.go index 8e843df..b29468a 100644 --- a/http.go +++ b/http.go @@ -604,6 +604,8 @@ func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...clie } func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + var body []byte + options := client.NewPublishOptions(opts...) // get proxy @@ -622,24 +624,22 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c for _, p := range ps { md := metadata.Copy(omd) md[metadata.HeaderContentType] = p.ContentType() - md[metadata.HeaderTopic] = p.Topic() - - cf, err := h.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - var body []byte // passed in raw data if d, ok := p.Payload().(*codec.Frame); ok { body = d.Data } else { - b := bytes.NewBuffer(nil) - if err := cf.Write(b, &codec.Message{Type: codec.Event}, p.Payload()); err != nil { + // use codec for payload + cf, err := h.newCodec(p.ContentType()) + if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } - body = b.Bytes() + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b } topic := p.Topic() @@ -647,6 +647,9 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c topic = exchange } + for k, v := range p.Metadata() { + md.Set(k, v) + } md.Set(metadata.HeaderTopic, topic) msgs = append(msgs, &broker.Message{Header: md, Body: body}) }