diff --git a/client/options.go b/client/options.go index dca251c6..30a6386c 100644 --- a/client/options.go +++ b/client/options.go @@ -65,6 +65,8 @@ type CallOptions struct { } type PublishOptions struct { + // Exchange is the routing exchange for the message + Exchange string // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -236,6 +238,13 @@ func DialTimeout(d time.Duration) Option { // Call Options +// WithExchange sets the exchange to route a message through +func WithExchange(e string) PublishOption { + return func(o *PublishOptions) { + o.Exchange = e + } +} + // WithAddress sets the remote address to use rather than using service discovery func WithAddress(a string) CallOption { return func(o *CallOptions) { diff --git a/client/rpc_client.go b/client/rpc_client.go index 864eaad1..b50f79f9 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -498,6 +498,13 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt } func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error { + options := PublishOptions{ + Context: context.Background(), + } + for _, o := range opts { + o(&options) + } + md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) @@ -508,6 +515,12 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt md["Micro-Topic"] = msg.Topic() md["Micro-Id"] = id + // get the routing exchange + topic := msg.Topic() + if len(options.Exchange) > 0 { + topic = options.Exchange + } + // encode message body cf, err := r.newCodec(msg.ContentType()) if err != nil { @@ -515,7 +528,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt } b := &buffer{bytes.NewBuffer(nil)} if err := cf(b).Write(&codec.Message{ - Target: msg.Topic(), + Target: topic, Type: codec.Publication, Header: map[string]string{ "Micro-Id": id, @@ -528,7 +541,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt r.opts.Broker.Connect() }) - return r.opts.Broker.Publish(msg.Topic(), &broker.Message{ + return r.opts.Broker.Publish(topic, &broker.Message{ Header: md, Body: b.Bytes(), })