diff --git a/client/client.go b/client/client.go index aa2178d3..78352657 100644 --- a/client/client.go +++ b/client/client.go @@ -12,7 +12,7 @@ import ( type Client interface { Init(...Option) error Options() Options - NewMessage(topic string, msg interface{}) Message + NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) @@ -56,6 +56,9 @@ type CallOption func(*CallOptions) // PublishOption used by Publish type PublishOption func(*PublishOptions) +// MessageOption used by NewMessage +type MessageOption func(*MessageOptions) + // RequestOption used by NewRequest type RequestOption func(*RequestOptions) @@ -83,13 +86,13 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca // Publishes a publication using the default client. Using the underlying broker // set within the options. -func Publish(ctx context.Context, msg Message) error { - return DefaultClient.Publish(ctx, msg) +func Publish(ctx context.Context, msg Message, opts ...PublishOption) error { + return DefaultClient.Publish(ctx, msg, opts...) } // Creates a new message using the default client -func NewMessage(topic string, payload interface{}) Message { - return DefaultClient.NewMessage(topic, payload) +func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message { + return DefaultClient.NewMessage(topic, payload, opts...) } // Creates a new client with the options passed in diff --git a/client/mock/mock.go b/client/mock/mock.go index c22d84d6..4a1e520e 100644 --- a/client/mock/mock.go +++ b/client/mock/mock.go @@ -49,8 +49,8 @@ func (m *MockClient) Options() client.Options { return m.Opts } -func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message { - return m.Client.NewMessage(topic, msg) +func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { + return m.Client.NewMessage(topic, msg, opts...) } func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { diff --git a/client/options.go b/client/options.go index 2d465850..8b924038 100644 --- a/client/options.go +++ b/client/options.go @@ -67,6 +67,10 @@ type PublishOptions struct { Context context.Context } +type MessageOptions struct { + ContentType string +} + type RequestOptions struct { ContentType string Stream bool diff --git a/client/rpc_client.go b/client/rpc_client.go index b71b6c91..1ceef41c 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -465,8 +465,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt }) } -func (r *rpcClient) NewMessage(topic string, message interface{}) Message { - return newMessage(topic, message, r.opts.ContentType) +func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message { + return newMessage(topic, message, r.opts.ContentType, opts...) } func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { diff --git a/client/rpc_message.go b/client/rpc_message.go index 13bd15c7..418c756f 100644 --- a/client/rpc_message.go +++ b/client/rpc_message.go @@ -6,7 +6,16 @@ type message struct { payload interface{} } -func newMessage(topic string, payload interface{}, contentType string) Message { +func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message { + var options MessageOptions + for _, o := range opts { + o(&options) + } + + if len(options.ContentType) > 0 { + contentType = options.ContentType + } + return &message{ payload: payload, topic: topic,