From 029a434a2bfd1fb931fdac87eb6f292568182fcb Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 9 May 2025 13:51:31 +0300 Subject: [PATCH] broker: pass broker content type if message options not pass it Signed-off-by: Vasiliy Tolstov --- broker/memory/memory.go | 3 +++ broker/noop.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/broker/memory/memory.go b/broker/memory/memory.go index c95c1b0c..058f057d 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -159,6 +159,9 @@ func (b *Broker) Init(opts ...broker.Option) error { func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.PublishOption) (broker.Message, error) { options := broker.NewPublishOptions(opts...) + if options.ContentType == "" { + options.ContentType = b.opts.ContentType + } m := &memoryMessage{ctx: ctx, hdr: hdr, opts: options} c, err := b.newCodec(m.opts.ContentType) if err == nil { diff --git a/broker/noop.go b/broker/noop.go index 4d160f3d..7bb745ba 100644 --- a/broker/noop.go +++ b/broker/noop.go @@ -128,6 +128,9 @@ func (m *noopMessage) Unmarshal(dst interface{}, opts ...codec.Option) error { func (b *NoopBroker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...PublishOption) (Message, error) { options := NewPublishOptions(opts...) + if options.ContentType == "" { + options.ContentType = b.opts.ContentType + } m := &noopMessage{ctx: ctx, hdr: hdr, opts: options} c, err := b.newCodec(m.opts.ContentType) if err == nil {