diff --git a/drpc.go b/drpc.go index c354729..fff63f4 100644 --- a/drpc.go +++ b/drpc.go @@ -2,7 +2,6 @@ package drpc // import "go.unistack.org/micro-client-drpc/v3" import ( - "bytes" "context" "fmt" "net" @@ -615,6 +614,8 @@ func (c *drpcClient) Publish(ctx context.Context, msg client.Message, opts ...cl } func (c *drpcClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { + var body []byte + options := client.NewPublishOptions(opts...) // get proxy @@ -633,24 +634,22 @@ func (c *drpcClient) publish(ctx context.Context, ps []client.Message, opts ...c for _, p := range ps { md := metadata.Copy(omd) md[metadata.HeaderContentType] = p.ContentType() - md[metadata.HeaderTopic] = p.Topic() - - cf, err := c.newCodec(p.ContentType()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - - var body []byte // passed in raw data if d, ok := p.Payload().(*codec.Frame); ok { body = d.Data } else { - b := bytes.NewBuffer(nil) - if err := cf.Write(b, &codec.Message{Type: codec.Event}, p.Payload()); err != nil { + // use codec for payload + cf, err := c.newCodec(p.ContentType()) + if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } - body = b.Bytes() + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b } topic := p.Topic() @@ -658,6 +657,9 @@ func (c *drpcClient) publish(ctx context.Context, ps []client.Message, opts ...c topic = exchange } + for k, v := range p.Metadata() { + md.Set(k, v) + } md.Set(metadata.HeaderTopic, topic) msgs = append(msgs, &broker.Message{Header: md, Body: body}) }