| @@ -12,7 +12,7 @@ import ( | |||||||
| type Client interface { | type Client interface { | ||||||
| 	Init(...Option) error | 	Init(...Option) error | ||||||
| 	Options() Options | 	Options() Options | ||||||
| 	NewMessage(topic string, msg interface{}) Message | 	NewMessage(topic string, msg interface{}, opts ...MessageOption) Message | ||||||
| 	NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request | 	NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request | ||||||
| 	Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error | 	Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error | ||||||
| 	Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) | 	Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) | ||||||
| @@ -56,6 +56,9 @@ type CallOption func(*CallOptions) | |||||||
| // PublishOption used by Publish | // PublishOption used by Publish | ||||||
| type PublishOption func(*PublishOptions) | type PublishOption func(*PublishOptions) | ||||||
|  |  | ||||||
|  | // MessageOption used by NewMessage | ||||||
|  | type MessageOption func(*MessageOptions) | ||||||
|  |  | ||||||
| // RequestOption used by NewRequest | // RequestOption used by NewRequest | ||||||
| type RequestOption func(*RequestOptions) | type RequestOption func(*RequestOptions) | ||||||
|  |  | ||||||
| @@ -83,13 +86,13 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca | |||||||
|  |  | ||||||
| // Publishes a publication using the default client. Using the underlying broker | // Publishes a publication using the default client. Using the underlying broker | ||||||
| // set within the options. | // set within the options. | ||||||
| func Publish(ctx context.Context, msg Message) error { | func Publish(ctx context.Context, msg Message, opts ...PublishOption) error { | ||||||
| 	return DefaultClient.Publish(ctx, msg) | 	return DefaultClient.Publish(ctx, msg, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Creates a new message using the default client | // Creates a new message using the default client | ||||||
| func NewMessage(topic string, payload interface{}) Message { | func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message { | ||||||
| 	return DefaultClient.NewMessage(topic, payload) | 	return DefaultClient.NewMessage(topic, payload, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Creates a new client with the options passed in | // Creates a new client with the options passed in | ||||||
|   | |||||||
| @@ -49,8 +49,8 @@ func (m *MockClient) Options() client.Options { | |||||||
| 	return m.Opts | 	return m.Opts | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *MockClient) NewMessage(topic string, msg interface{}) client.Message { | func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { | ||||||
| 	return m.Client.NewMessage(topic, msg) | 	return m.Client.NewMessage(topic, msg, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { | func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { | ||||||
|   | |||||||
| @@ -67,6 +67,10 @@ type PublishOptions struct { | |||||||
| 	Context context.Context | 	Context context.Context | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type MessageOptions struct { | ||||||
|  | 	ContentType string | ||||||
|  | } | ||||||
|  |  | ||||||
| type RequestOptions struct { | type RequestOptions struct { | ||||||
| 	ContentType string | 	ContentType string | ||||||
| 	Stream      bool | 	Stream      bool | ||||||
|   | |||||||
| @@ -465,8 +465,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcClient) NewMessage(topic string, message interface{}) Message { | func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message { | ||||||
| 	return newMessage(topic, message, r.opts.ContentType) | 	return newMessage(topic, message, r.opts.ContentType, opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { | func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { | ||||||
|   | |||||||
| @@ -6,7 +6,16 @@ type message struct { | |||||||
| 	payload     interface{} | 	payload     interface{} | ||||||
| } | } | ||||||
|  |  | ||||||
| func newMessage(topic string, payload interface{}, contentType string) Message { | func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message { | ||||||
|  | 	var options MessageOptions | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(options.ContentType) > 0 { | ||||||
|  | 		contentType = options.ContentType | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return &message{ | 	return &message{ | ||||||
| 		payload:     payload, | 		payload:     payload, | ||||||
| 		topic:       topic, | 		topic:       topic, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user