couple of fixes

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-10 23:44:15 +03:00
parent ba37142442
commit e1ff56f784

50
stan.go
View File

@ -97,12 +97,7 @@ func (n *subscriber) Close() error {
} }
func (n *stanBroker) Address() string { func (n *stanBroker) Address() string {
// stan does not support connected server info return strings.Join(n.addrs, ",")
if len(n.addrs) > 0 {
return n.addrs[0]
}
return ""
} }
func setAddrs(addrs []string) []string { func setAddrs(addrs []string) []string {
@ -299,23 +294,29 @@ func (n *stanBroker) Options() broker.Options {
} }
func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, opts ...broker.PublishOption) error { func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, opts ...broker.PublishOption) error {
msgs := make(map[string][][]byte) var err error
var buf []byte
var wg sync.WaitGroup var wg sync.WaitGroup
msgs := make(map[string][][]byte)
options := broker.NewPublishOptions(opts...)
wg.Add(len(msg)) wg.Add(len(msg))
for _, m := range msg { for _, m := range msg {
b, err := n.opts.Codec.Marshal(m) if options.BodyOnly {
if err != nil { buf = m.Body
return err } else {
buf, err = n.opts.Codec.Marshal(m)
if err != nil {
return err
}
} }
topic, _ := m.Header.Get(metadata.HeaderTopic) topic, _ := m.Header.Get(metadata.HeaderTopic)
msgs[topic] = append(msgs[topic], b) msgs[topic] = append(msgs[topic], buf)
} }
n.RLock()
defer n.RUnlock()
var ackErr error var ackErr error
ackHandler := func(ackedNuid string, err error) { ackHandler := func(ackedNuid string, err error) {
@ -325,6 +326,9 @@ func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, op
} }
} }
n.RLock()
defer n.RUnlock()
for topic, ms := range msgs { for topic, ms := range msgs {
for _, m := range ms { for _, m := range ms {
if _, err := n.conn.PublishAsync(topic, m, ackHandler); err != nil { if _, err := n.conn.PublishAsync(topic, m, ackHandler); err != nil {
@ -339,13 +343,23 @@ func (n *stanBroker) BatchPublish(ctx context.Context, msg []*broker.Message, op
} }
func (n *stanBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { func (n *stanBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
b, err := n.opts.Codec.Marshal(msg) var buf []byte
if err != nil { var err error
return err
options := broker.NewPublishOptions(opts...)
if options.BodyOnly {
buf = msg.Body
} else {
buf, err = n.opts.Codec.Marshal(msg)
if err != nil {
return err
}
} }
n.RLock() n.RLock()
defer n.RUnlock() defer n.RUnlock()
return n.conn.Publish(topic, b) return n.conn.Publish(topic, buf)
} }
func (n *stanBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (n *stanBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {