use native nats headers

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-23 08:11:37 +03:00
parent dd75f63d37
commit 960033dfe2

114
nats.go
View File

@ -15,6 +15,12 @@ import (
"golang.org/x/sync/errgroup"
)
var pPool = sync.Pool{
New: func() interface{} {
return &publication{msg: broker.NewMessage("")}
},
}
type natsBroker struct {
sync.Once
sync.RWMutex
@ -33,17 +39,17 @@ type natsBroker struct {
}
type publication struct {
t string
topic string
err error
m *broker.Message
msg *broker.Message
}
func (p *publication) Topic() string {
return p.t
return p.topic
}
func (p *publication) Message() *broker.Message {
return p.m
return p.msg
}
func (p *publication) Ack() error {
@ -186,19 +192,33 @@ func (n *natsBroker) Options() broker.Options {
return n.opts
}
func (n *natsBroker) BatchPublish(ctx context.Context, msg []*broker.Message, opts ...broker.PublishOption) error {
msgs := make(map[string][][]byte)
func (n *natsBroker) BatchPublish(ctx context.Context, p []*broker.Message, opts ...broker.PublishOption) error {
var err error
msgs := make([]*nats.Msg, len(p))
var wg sync.WaitGroup
wg.Add(len(msg))
wg.Add(len(p))
for _, m := range msg {
b, err := n.opts.Codec.Marshal(m)
options := broker.NewPublishOptions(opts...)
for _, m := range p {
rec := &nats.Msg{}
rec.Subject, _ = m.Header.Get(metadata.HeaderTopic)
if options.BodyOnly {
rec.Data = m.Body
} else if n.opts.Codec.String() == "noop" {
rec.Data = m.Body
rec.Header = make(nats.Header, len(m.Header))
for k, v := range m.Header {
rec.Header.Add(k, v)
}
} else {
rec.Data, err = n.opts.Codec.Marshal(m)
if err != nil {
return err
}
topic, _ := m.Header.Get(metadata.HeaderTopic)
msgs[topic] = append(msgs[topic], b)
}
msgs = append(msgs, rec)
}
n.RLock()
@ -206,22 +226,18 @@ func (n *natsBroker) BatchPublish(ctx context.Context, msg []*broker.Message, op
g := errgroup.Group{}
for topic, ms := range msgs {
for _, m := range ms {
for _, ms := range msgs {
m := ms
g.Go(func() error {
return n.conn.Publish(topic, m)
return n.conn.PublishMsg(m)
})
}
}
return g.Wait()
}
func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
var err error
var buf []byte
options := broker.NewPublishOptions(opts...)
n.RLock()
if n.conn == nil {
@ -230,10 +246,20 @@ func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Mess
}
n.RUnlock()
options := broker.NewPublishOptions(opts...)
rec := &nats.Msg{}
rec.Subject, _ = msg.Header.Get(metadata.HeaderTopic)
if options.BodyOnly {
buf = msg.Body
rec.Data = msg.Body
} else if n.opts.Codec.String() == "noop" {
rec.Data = msg.Body
rec.Header = make(nats.Header, len(msg.Header))
for k, v := range msg.Header {
rec.Header.Add(k, v)
}
} else {
buf, err = n.opts.Codec.Marshal(msg)
rec.Data, err = n.opts.Codec.Marshal(msg)
if err != nil {
return err
}
@ -241,7 +267,8 @@ func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Mess
n.RLock()
defer n.RUnlock()
return n.conn.Publish(topic, buf)
return n.conn.PublishMsg(rec)
}
func (n *natsBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
@ -256,21 +283,33 @@ func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker
}
n.RUnlock()
opt := broker.NewSubscribeOptions(opts...)
fn := func(msg *nats.Msg) {
m := &broker.Message{}
pub := &publication{t: msg.Subject}
options := broker.NewSubscribeOptions(opts...)
eh := n.opts.ErrorHandler
if opt.ErrorHandler != nil {
eh = opt.ErrorHandler
if options.ErrorHandler != nil {
eh = options.ErrorHandler
}
err := n.opts.Codec.Unmarshal(msg.Data, &m)
fn := func(msg *nats.Msg) {
pub := pPool.Get().(*publication)
pub.msg.Header = nil
pub.msg.Body = nil
pub.topic = msg.Subject
pub.err = nil
if options.BodyOnly {
pub.msg.Body = msg.Data
} else if n.opts.Codec.String() == "noop" {
pub.msg.Body = msg.Data
pub.msg.Header = metadata.New(len(msg.Header))
for k, v := range msg.Header {
pub.msg.Header.Set(k, strings.Join(v, ","))
}
} else {
err := n.opts.Codec.Unmarshal(msg.Data, pub.msg)
pub.err = err
pub.m = m
if err != nil {
pub.m.Body = msg.Data
pub.msg.Body = msg.Data
if eh != nil {
eh(pub)
} else {
@ -280,6 +319,7 @@ func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker
}
return
}
}
if err := handler(pub); err != nil {
pub.err = err
if eh != nil {
@ -296,8 +336,8 @@ func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker
var err error
n.RLock()
if len(opt.Group) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Group, fn)
if len(options.Group) > 0 {
sub, err = n.conn.QueueSubscribe(topic, options.Group, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
@ -305,7 +345,7 @@ func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker
if err != nil {
return nil, err
}
return &subscriber{s: sub, opts: opt}, nil
return &subscriber{s: sub, opts: options}, nil
}
func (n *natsBroker) String() string {
@ -366,9 +406,13 @@ func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) {
n.closeCh <- err
}
func NewBroker(opts ...broker.Option) broker.Broker {
func NewBroker(opts ...broker.Option) *natsBroker {
options := broker.NewOptions(opts...)
if options.Codec.String() != "noop" {
options.Logger.Infof(options.Context, "broker codec not noop, disable plain nats headers usage")
}
n := &natsBroker{
opts: options,
}