broker: fix message options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
@@ -41,7 +41,7 @@ type Broker interface {
|
|||||||
// Disconnect disconnect from broker
|
// Disconnect disconnect from broker
|
||||||
Disconnect(ctx context.Context) error
|
Disconnect(ctx context.Context) error
|
||||||
// NewMessage create new broker message to publish.
|
// NewMessage create new broker message to publish.
|
||||||
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error)
|
NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error)
|
||||||
// Publish message to broker topic
|
// Publish message to broker topic
|
||||||
Publish(ctx context.Context, topic string, messages ...Message) error
|
Publish(ctx context.Context, topic string, messages ...Message) error
|
||||||
// Subscribe subscribes to topic message via handler
|
// Subscribe subscribes to topic message via handler
|
||||||
|
@@ -42,9 +42,9 @@ func SetSubscribeOption(k, v interface{}) SubscribeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPublishOption returns a function to setup a context with given value
|
// SetMessageOption returns a function to setup a context with given value
|
||||||
func SetPublishOption(k, v interface{}) PublishOption {
|
func SetMessageOption(k, v interface{}) MessageOption {
|
||||||
return func(o *PublishOptions) {
|
return func(o *MessageOptions) {
|
||||||
if o.Context == nil {
|
if o.Context == nil {
|
||||||
o.Context = context.Background()
|
o.Context = context.Background()
|
||||||
}
|
}
|
||||||
|
@@ -32,7 +32,7 @@ type memoryMessage struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
body []byte
|
body []byte
|
||||||
hdr metadata.Metadata
|
hdr metadata.Metadata
|
||||||
opts broker.PublishOptions
|
opts broker.MessageOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryMessage) Ack() error {
|
func (m *memoryMessage) Ack() error {
|
||||||
@@ -157,8 +157,8 @@ func (b *Broker) Init(opts ...broker.Option) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) {
|
func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.MessageOption) (broker.Message, error) {
|
||||||
options := broker.NewPublishOptions(opts...)
|
options := broker.NewMessageOptions(opts...)
|
||||||
if options.ContentType == "" {
|
if options.ContentType == "" {
|
||||||
options.ContentType = b.opts.ContentType
|
options.ContentType = b.opts.ContentType
|
||||||
}
|
}
|
||||||
|
@@ -49,7 +49,7 @@ func TestMemoryBroker(t *testing.T) {
|
|||||||
"id", fmt.Sprintf("%d", i),
|
"id", fmt.Sprintf("%d", i),
|
||||||
),
|
),
|
||||||
[]byte(`"hello world"`),
|
[]byte(`"hello world"`),
|
||||||
broker.PublishContentType("application/octet-stream"),
|
broker.MessageContentType("application/octet-stream"),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@@ -99,7 +99,7 @@ type noopMessage struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
body []byte
|
body []byte
|
||||||
hdr metadata.Metadata
|
hdr metadata.Metadata
|
||||||
opts PublishOptions
|
opts MessageOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *noopMessage) Ack() error {
|
func (m *noopMessage) Ack() error {
|
||||||
@@ -126,8 +126,8 @@ func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
|
|||||||
return m.c.Unmarshal(m.body, dst)
|
return m.c.Unmarshal(m.body, dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) {
|
func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...MessageOption) (Message, error) {
|
||||||
options := NewPublishOptions(opts...)
|
options := NewMessageOptions(opts...)
|
||||||
if options.ContentType == "" {
|
if options.ContentType == "" {
|
||||||
options.ContentType = b.opts.ContentType
|
options.ContentType = b.opts.ContentType
|
||||||
}
|
}
|
||||||
|
@@ -87,8 +87,8 @@ func ContentType(ct string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishOptions struct
|
// MessageOptions struct
|
||||||
type PublishOptions struct {
|
type MessageOptions struct {
|
||||||
// ContentType for message body
|
// ContentType for message body
|
||||||
ContentType string
|
ContentType string
|
||||||
// BodyOnly flag says the message contains raw body bytes and don't need
|
// BodyOnly flag says the message contains raw body bytes and don't need
|
||||||
@@ -98,9 +98,9 @@ type PublishOptions struct {
|
|||||||
Context context.Context
|
Context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPublishOptions creates PublishOptions struct
|
// NewMessageOptions creates MessageOptions struct
|
||||||
func NewPublishOptions(opts ...PublishOption) PublishOptions {
|
func NewMessageOptions(opts ...MessageOption) MessageOptions {
|
||||||
options := PublishOptions{
|
options := MessageOptions{
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
@@ -128,19 +128,19 @@ type SubscribeOptions struct {
|
|||||||
// Option func
|
// Option func
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
|
||||||
// PublishOption func
|
// MessageOption func
|
||||||
type PublishOption func(*PublishOptions)
|
type MessageOption func(*MessageOptions)
|
||||||
|
|
||||||
// PublishContentType sets message content-type that used to Marshal
|
// MessageContentType sets message content-type that used to Marshal
|
||||||
func PublishContentType(ct string) PublishOption {
|
func MessageContentType(ct string) MessageOption {
|
||||||
return func(o *PublishOptions) {
|
return func(o *MessageOptions) {
|
||||||
o.ContentType = ct
|
o.ContentType = ct
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PublishBodyOnly publish only body of the message
|
// MessageBodyOnly publish only body of the message
|
||||||
func PublishBodyOnly(b bool) PublishOption {
|
func MessageBodyOnly(b bool) MessageOption {
|
||||||
return func(o *PublishOptions) {
|
return func(o *MessageOptions) {
|
||||||
o.BodyOnly = b
|
o.BodyOnly = b
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user