add message options

This commit is contained in:
Asim Aslam 2018-05-10 17:33:54 +01:00
parent b39ec4472c
commit c3c0543733
4 changed files with 24 additions and 8 deletions

View File

@ -12,7 +12,7 @@ import (
type Client interface { type Client interface {
Init(...Option) error Init(...Option) error
Options() Options Options() Options
NewMessage(topic string, msg interface{}) Message NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
@ -56,6 +56,9 @@ type CallOption func(*CallOptions)
// PublishOption used by Publish // PublishOption used by Publish
type PublishOption func(*PublishOptions) type PublishOption func(*PublishOptions)
// MessageOption used by NewMessage
type MessageOption func(*MessageOptions)
// RequestOption used by NewRequest // RequestOption used by NewRequest
type RequestOption func(*RequestOptions) type RequestOption func(*RequestOptions)
@ -83,13 +86,13 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
// Publishes a publication using the default client. Using the underlying broker // Publishes a publication using the default client. Using the underlying broker
// set within the options. // set within the options.
func Publish(ctx context.Context, msg Message) error { func Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
return DefaultClient.Publish(ctx, msg) return DefaultClient.Publish(ctx, msg, opts...)
} }
// Creates a new message using the default client // Creates a new message using the default client
func NewMessage(topic string, payload interface{}) Message { func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message {
return DefaultClient.NewMessage(topic, payload) return DefaultClient.NewMessage(topic, payload, opts...)
} }
// Creates a new client with the options passed in // Creates a new client with the options passed in

View File

@ -67,6 +67,10 @@ type PublishOptions struct {
Context context.Context Context context.Context
} }
type MessageOptions struct {
ContentType string
}
type RequestOptions struct { type RequestOptions struct {
ContentType string ContentType string
Stream bool Stream bool

View File

@ -465,8 +465,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
}) })
} }
func (r *rpcClient) NewMessage(topic string, message interface{}) Message { func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message {
return newMessage(topic, message, r.opts.ContentType) return newMessage(topic, message, r.opts.ContentType, opts...)
} }
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {

View File

@ -6,7 +6,16 @@ type message struct {
payload interface{} payload interface{}
} }
func newMessage(topic string, payload interface{}, contentType string) Message { func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message {
var options MessageOptions
for _, o := range opts {
o(&options)
}
if len(options.ContentType) > 0 {
contentType = options.ContentType
}
return &message{ return &message{
payload: payload, payload: payload,
topic: topic, topic: topic,