diff --git a/client/rpc_client.go b/client/rpc_client.go index 336dd34f..b85e76b7 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -9,6 +9,7 @@ import ( "sync/atomic" + "github.com/google/uuid" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/errors" @@ -444,7 +445,11 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt if !ok { md = make(map[string]string) } + + id := uuid.New().String() md["Content-Type"] = msg.ContentType() + md["X-Micro-Topic"] = msg.Topic() + md["X-Micro-Id"] = id // encode message body cf, err := r.newCodec(msg.ContentType()) @@ -452,7 +457,13 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt return errors.InternalServerError("go.micro.client", err.Error()) } b := &buffer{bytes.NewBuffer(nil)} - if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil { + if err := cf(b).Write(&codec.Message{ + Type: codec.Publication, + Header: map[string]string{ + "X-Micro-Id": id, + "X-Micro-Topic": msg.Topic(), + }, + }, msg.Payload()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } r.once.Do(func() {