Set topic header

This commit is contained in:
Asim Aslam 2019-01-10 20:35:10 +00:00
parent 59d82b0abe
commit 40ff5b749b

View File

@ -9,6 +9,7 @@ import (
"sync/atomic"
"github.com/google/uuid"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors"
@ -444,7 +445,11 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
if !ok {
md = make(map[string]string)
}
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["X-Micro-Topic"] = msg.Topic()
md["X-Micro-Id"] = id
// encode message body
cf, err := r.newCodec(msg.ContentType())
@ -452,7 +457,13 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil {
if err := cf(b).Write(&codec.Message{
Type: codec.Publication,
Header: map[string]string{
"X-Micro-Id": id,
"X-Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {