diff --git a/http.go b/http.go index 8e843df..b29468a 100644 --- a/http.go +++ b/http.go @@ -604,6 +604,8 @@ func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...clie } func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + var body []byte + options := client.NewPublishOptions(opts...) // get proxy @@ -622,24 +624,22 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c for _, p := range ps { md := metadata.Copy(omd) md[metadata.HeaderContentType] = p.ContentType() - md[metadata.HeaderTopic] = p.Topic() - - cf, err := h.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - var body []byte // passed in raw data if d, ok := p.Payload().(*codec.Frame); ok { body = d.Data } else { - b := bytes.NewBuffer(nil) - if err := cf.Write(b, &codec.Message{Type: codec.Event}, p.Payload()); err != nil { + // use codec for payload + cf, err := h.newCodec(p.ContentType()) + if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } - body = b.Bytes() + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b } topic := p.Topic() @@ -647,6 +647,9 @@ func (h *httpClient) publish(ctx context.Context, ps []client.Message, opts ...c topic = exchange } + for k, v := range p.Metadata() { + md.Set(k, v) + } md.Set(metadata.HeaderTopic, topic) msgs = append(msgs, &broker.Message{Header: md, Body: body}) }