broker: fix message options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -41,7 +41,7 @@ type Broker interface { | |||||||
| 	// Disconnect disconnect from broker | 	// Disconnect disconnect from broker | ||||||
| 	Disconnect(ctx context.Context) error | 	Disconnect(ctx context.Context) error | ||||||
| 	// NewMessage create new broker message to publish. | 	// NewMessage create new broker message to publish. | ||||||
| 	NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) | 	NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) | ||||||
| 	// Publish message to broker topic | 	// Publish message to broker topic | ||||||
| 	Publish(ctx context.Context, topic string, messages ...Message) error | 	Publish(ctx context.Context, topic string, messages ...Message) error | ||||||
| 	// Subscribe subscribes to topic message via handler | 	// Subscribe subscribes to topic message via handler | ||||||
|   | |||||||
| @@ -42,9 +42,9 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // SetPublishOption returns a function to setup a context with given value | // SetMessageOption returns a function to setup a context with given value | ||||||
| func SetPublishOption(k, v interface{}) PublishOption { | func SetMessageOption(k, v interface{}) MessageOption { | ||||||
| 	return func(o *PublishOptions) { | 	return func(o *MessageOptions) { | ||||||
| 		if o.Context == nil { | 		if o.Context == nil { | ||||||
| 			o.Context = context.Background() | 			o.Context = context.Background() | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -32,7 +32,7 @@ type memoryMessage struct { | |||||||
| 	ctx   context.Context | 	ctx   context.Context | ||||||
| 	body  []byte | 	body  []byte | ||||||
| 	hdr   metadata.Metadata | 	hdr   metadata.Metadata | ||||||
| 	opts  broker.PublishOptions | 	opts  broker.MessageOptions | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *memoryMessage) Ack() error { | func (m *memoryMessage) Ack() error { | ||||||
| @@ -157,8 +157,8 @@ func (b *Broker) Init(opts ...broker.Option) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) { | func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.MessageOption) (broker.Message, error) { | ||||||
| 	options := broker.NewPublishOptions(opts...) | 	options := broker.NewMessageOptions(opts...) | ||||||
| 	if options.ContentType == "" { | 	if options.ContentType == "" { | ||||||
| 		options.ContentType = b.opts.ContentType | 		options.ContentType = b.opts.ContentType | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -49,7 +49,7 @@ func TestMemoryBroker(t *testing.T) { | |||||||
| 				"id", fmt.Sprintf("%d", i), | 				"id", fmt.Sprintf("%d", i), | ||||||
| 			), | 			), | ||||||
| 			[]byte(`"hello world"`), | 			[]byte(`"hello world"`), | ||||||
| 			broker.PublishContentType("application/octet-stream"), | 			broker.MessageContentType("application/octet-stream"), | ||||||
| 		) | 		) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Fatal(err) | 			t.Fatal(err) | ||||||
|   | |||||||
| @@ -99,7 +99,7 @@ type noopMessage struct { | |||||||
| 	ctx  context.Context | 	ctx  context.Context | ||||||
| 	body []byte | 	body []byte | ||||||
| 	hdr  metadata.Metadata | 	hdr  metadata.Metadata | ||||||
| 	opts PublishOptions | 	opts MessageOptions | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *noopMessage) Ack() error { | func (m *noopMessage) Ack() error { | ||||||
| @@ -126,8 +126,8 @@ func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error { | |||||||
| 	return m.c.Unmarshal(m.body, dst) | 	return m.c.Unmarshal(m.body, dst) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) { | func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) { | ||||||
| 	options := NewPublishOptions(opts...) | 	options := NewMessageOptions(opts...) | ||||||
| 	if options.ContentType == "" { | 	if options.ContentType == "" { | ||||||
| 		options.ContentType = b.opts.ContentType | 		options.ContentType = b.opts.ContentType | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -87,8 +87,8 @@ func ContentType(ct string) Option { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // PublishOptions struct | // MessageOptions struct | ||||||
| type PublishOptions struct { | type MessageOptions struct { | ||||||
| 	// ContentType for message body | 	// ContentType for message body | ||||||
| 	ContentType string | 	ContentType string | ||||||
| 	// BodyOnly flag says the message contains raw body bytes and don't need | 	// BodyOnly flag says the message contains raw body bytes and don't need | ||||||
| @@ -98,9 +98,9 @@ type PublishOptions struct { | |||||||
| 	Context context.Context | 	Context context.Context | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewPublishOptions creates PublishOptions struct | // NewMessageOptions creates MessageOptions struct | ||||||
| func NewPublishOptions(opts ...PublishOption) PublishOptions { | func NewMessageOptions(opts ...MessageOption) MessageOptions { | ||||||
| 	options := PublishOptions{ | 	options := MessageOptions{ | ||||||
| 		Context: context.Background(), | 		Context: context.Background(), | ||||||
| 	} | 	} | ||||||
| 	for _, o := range opts { | 	for _, o := range opts { | ||||||
| @@ -128,19 +128,19 @@ type SubscribeOptions struct { | |||||||
| // Option func | // Option func | ||||||
| type Option func(*Options) | type Option func(*Options) | ||||||
|  |  | ||||||
| // PublishOption func | // MessageOption func | ||||||
| type PublishOption func(*PublishOptions) | type MessageOption func(*MessageOptions) | ||||||
|  |  | ||||||
| // PublishContentType sets message content-type that used to Marshal | // MessageContentType sets message content-type that used to Marshal | ||||||
| func PublishContentType(ct string) PublishOption { | func MessageContentType(ct string) MessageOption { | ||||||
| 	return func(o *PublishOptions) { | 	return func(o *MessageOptions) { | ||||||
| 		o.ContentType = ct | 		o.ContentType = ct | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // PublishBodyOnly publish only body of the message | // MessageBodyOnly publish only body of the message | ||||||
| func PublishBodyOnly(b bool) PublishOption { | func MessageBodyOnly(b bool) MessageOption { | ||||||
| 	return func(o *PublishOptions) { | 	return func(o *MessageOptions) { | ||||||
| 		o.BodyOnly = b | 		o.BodyOnly = b | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user