diff --git a/stan.go b/stan.go index de4a081..7932f3b 100644 --- a/stan.go +++ b/stan.go @@ -97,12 +97,7 @@ func (n *subscriber) Close() error { } func (n *stanBroker) Address() string { - // stan does not support connected server info - if len(n.addrs) > 0 { - return n.addrs[0] - } - - return "" + return strings.Join(n.addrs, ",") } func setAddrs(addrs []string) []string { @@ -299,23 +294,29 @@ func (n *stanBroker) Options() broker.Options { } func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, opts ...broker.PublishOption) error { - msgs := make(map[string][][]byte) + var err error + var buf []byte + var wg sync.WaitGroup + msgs := make(map[string][][]byte) + + options := broker.NewPublishOptions(opts...) wg.Add(len(msg)) for _, m := range msg { - b, err := n.opts.Codec.Marshal(m) - if err != nil { - return err + if options.BodyOnly { + buf = m.Body + } else { + buf, err = n.opts.Codec.Marshal(m) + if err != nil { + return err + } } topic, _ := m.Header.Get(metadata.HeaderTopic) - msgs[topic] = append(msgs[topic], b) + msgs[topic] = append(msgs[topic], buf) } - n.RLock() - defer n.RUnlock() - var ackErr error ackHandler := func(ackedNuid string, err error) { @@ -325,6 +326,9 @@ func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, op } } + n.RLock() + defer n.RUnlock() + for topic, ms := range msgs { for _, m := range ms { if _, err := n.conn.PublishAsync(topic, m, ackHandler); err != nil { @@ -339,13 +343,23 @@ func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, op } func (n *stanBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { - b, err := n.opts.Codec.Marshal(msg) - if err != nil { - return err + var buf []byte + var err error + + options := broker.NewPublishOptions(opts...) + + if options.BodyOnly { + buf = msg.Body + } else { + buf, err = n.opts.Codec.Marshal(msg) + if err != nil { + return err + } } + n.RLock() defer n.RUnlock() - return n.conn.Publish(topic, b) + return n.conn.Publish(topic, buf) } func (n *stanBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {