diff --git a/nats.go b/nats.go index 672f531..392d054 100644 --- a/nats.go +++ b/nats.go @@ -15,6 +15,12 @@ import ( "golang.org/x/sync/errgroup" ) +var pPool = sync.Pool{ + New: func() interface{} { + return &publication{msg: broker.NewMessage("")} + }, +} + type natsBroker struct { sync.Once sync.RWMutex @@ -33,17 +39,17 @@ type natsBroker struct { } type publication struct { - t string - err error - m *broker.Message + topic string + err error + msg *broker.Message } func (p *publication) Topic() string { - return p.t + return p.topic } func (p *publication) Message() *broker.Message { - return p.m + return p.msg } func (p *publication) Ack() error { @@ -186,19 +192,33 @@ func (n *natsBroker) Options() broker.Options { return n.opts } -func (n *natsBroker) BatchPublish(ctx context.Context, msg []*broker.Message, opts ...broker.PublishOption) error { - msgs := make(map[string][][]byte) +func (n *natsBroker) BatchPublish(ctx context.Context, p []*broker.Message, opts ...broker.PublishOption) error { + var err error + msgs := make([]*nats.Msg, len(p)) var wg sync.WaitGroup - wg.Add(len(msg)) + wg.Add(len(p)) - for _, m := range msg { - b, err := n.opts.Codec.Marshal(m) - if err != nil { - return err + options := broker.NewPublishOptions(opts...) + + for _, m := range p { + rec := &nats.Msg{} + rec.Subject, _ = m.Header.Get(metadata.HeaderTopic) + if options.BodyOnly { + rec.Data = m.Body + } else if n.opts.Codec.String() == "noop" { + rec.Data = m.Body + rec.Header = make(nats.Header, len(m.Header)) + for k, v := range m.Header { + rec.Header.Add(k, v) + } + } else { + rec.Data, err = n.opts.Codec.Marshal(m) + if err != nil { + return err + } } - topic, _ := m.Header.Get(metadata.HeaderTopic) - msgs[topic] = append(msgs[topic], b) + msgs = append(msgs, rec) } n.RLock() @@ -206,12 +226,11 @@ func (n *natsBroker) BatchPublish(ctx context.Context, msg []*broker.Message, op g := errgroup.Group{} - for topic, ms := range msgs { - for _, m := range ms { - g.Go(func() error { - return n.conn.Publish(topic, m) - }) - } + for _, ms := range msgs { + m := ms + g.Go(func() error { + return n.conn.PublishMsg(m) + }) } return g.Wait() @@ -219,9 +238,6 @@ func (n *natsBroker) BatchPublish(ctx context.Context, msg []*broker.Message, op func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { var err error - var buf []byte - - options := broker.NewPublishOptions(opts...) n.RLock() if n.conn == nil { @@ -230,10 +246,20 @@ func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Mess } n.RUnlock() + options := broker.NewPublishOptions(opts...) + + rec := &nats.Msg{} + rec.Subject, _ = msg.Header.Get(metadata.HeaderTopic) if options.BodyOnly { - buf = msg.Body + rec.Data = msg.Body + } else if n.opts.Codec.String() == "noop" { + rec.Data = msg.Body + rec.Header = make(nats.Header, len(msg.Header)) + for k, v := range msg.Header { + rec.Header.Add(k, v) + } } else { - buf, err = n.opts.Codec.Marshal(msg) + rec.Data, err = n.opts.Codec.Marshal(msg) if err != nil { return err } @@ -241,7 +267,8 @@ func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Mess n.RLock() defer n.RUnlock() - return n.conn.Publish(topic, buf) + + return n.conn.PublishMsg(rec) } func (n *natsBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { @@ -256,29 +283,42 @@ func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker } n.RUnlock() - opt := broker.NewSubscribeOptions(opts...) + options := broker.NewSubscribeOptions(opts...) + + eh := n.opts.ErrorHandler + if options.ErrorHandler != nil { + eh = options.ErrorHandler + } fn := func(msg *nats.Msg) { - m := &broker.Message{} - pub := &publication{t: msg.Subject} + pub := pPool.Get().(*publication) + pub.msg.Header = nil + pub.msg.Body = nil + pub.topic = msg.Subject + pub.err = nil - eh := n.opts.ErrorHandler - if opt.ErrorHandler != nil { - eh = opt.ErrorHandler - } - err := n.opts.Codec.Unmarshal(msg.Data, &m) - pub.err = err - pub.m = m - if err != nil { - pub.m.Body = msg.Data - if eh != nil { - eh(pub) - } else { - if n.opts.Logger.V(logger.ErrorLevel) { - n.opts.Logger.Error(n.opts.Context, err) - } + if options.BodyOnly { + pub.msg.Body = msg.Data + } else if n.opts.Codec.String() == "noop" { + pub.msg.Body = msg.Data + pub.msg.Header = metadata.New(len(msg.Header)) + for k, v := range msg.Header { + pub.msg.Header.Set(k, strings.Join(v, ",")) + } + } else { + err := n.opts.Codec.Unmarshal(msg.Data, pub.msg) + pub.err = err + if err != nil { + pub.msg.Body = msg.Data + if eh != nil { + eh(pub) + } else { + if n.opts.Logger.V(logger.ErrorLevel) { + n.opts.Logger.Error(n.opts.Context, err) + } + } + return } - return } if err := handler(pub); err != nil { pub.err = err @@ -296,8 +336,8 @@ func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker var err error n.RLock() - if len(opt.Group) > 0 { - sub, err = n.conn.QueueSubscribe(topic, opt.Group, fn) + if len(options.Group) > 0 { + sub, err = n.conn.QueueSubscribe(topic, options.Group, fn) } else { sub, err = n.conn.Subscribe(topic, fn) } @@ -305,7 +345,7 @@ func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker if err != nil { return nil, err } - return &subscriber{s: sub, opts: opt}, nil + return &subscriber{s: sub, opts: options}, nil } func (n *natsBroker) String() string { @@ -366,9 +406,13 @@ func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) { n.closeCh <- err } -func NewBroker(opts ...broker.Option) broker.Broker { +func NewBroker(opts ...broker.Option) *natsBroker { options := broker.NewOptions(opts...) + if options.Codec.String() != "noop" { + options.Logger.Infof(options.Context, "broker codec not noop, disable plain nats headers usage") + } + n := &natsBroker{ opts: options, }