Add Exchange option

This commit is contained in:
Asim Aslam 2019-02-23 10:50:53 +00:00
parent 7db2912d90
commit 58adaef339
2 changed files with 24 additions and 2 deletions

View File

@ -65,6 +65,8 @@ type CallOptions struct {
} }
type PublishOptions struct { type PublishOptions struct {
// Exchange is the routing exchange for the message
Exchange string
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
Context context.Context Context context.Context
@ -236,6 +238,13 @@ func DialTimeout(d time.Duration) Option {
// Call Options // Call Options
// WithExchange sets the exchange to route a message through
func WithExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
}
}
// WithAddress sets the remote address to use rather than using service discovery // WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption { func WithAddress(a string) CallOption {
return func(o *CallOptions) { return func(o *CallOptions) {

View File

@ -498,6 +498,13 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
} }
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error { func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx) md, ok := metadata.FromContext(ctx)
if !ok { if !ok {
md = make(map[string]string) md = make(map[string]string)
@ -508,6 +515,12 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
md["Micro-Topic"] = msg.Topic() md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id md["Micro-Id"] = id
// get the routing exchange
topic := msg.Topic()
if len(options.Exchange) > 0 {
topic = options.Exchange
}
// encode message body // encode message body
cf, err := r.newCodec(msg.ContentType()) cf, err := r.newCodec(msg.ContentType())
if err != nil { if err != nil {
@ -515,7 +528,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
} }
b := &buffer{bytes.NewBuffer(nil)} b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{ if err := cf(b).Write(&codec.Message{
Target: msg.Topic(), Target: topic,
Type: codec.Publication, Type: codec.Publication,
Header: map[string]string{ Header: map[string]string{
"Micro-Id": id, "Micro-Id": id,
@ -528,7 +541,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
r.opts.Broker.Connect() r.opts.Broker.Connect()
}) })
return r.opts.Broker.Publish(msg.Topic(), &broker.Message{ return r.opts.Broker.Publish(topic, &broker.Message{
Header: md, Header: md,
Body: b.Bytes(), Body: b.Bytes(),
}) })