73
nats.go
73
nats.go
@@ -11,6 +11,8 @@ import (
|
||||
nats "github.com/nats-io/nats.go"
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type natsBroker struct {
|
||||
@@ -49,6 +51,10 @@ func (p *publication) Ack() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *publication) SetError(err error) {
|
||||
p.err = err
|
||||
}
|
||||
|
||||
func (p *publication) Error() error {
|
||||
return p.err
|
||||
}
|
||||
@@ -75,11 +81,7 @@ func (n *natsBroker) Address() string {
|
||||
return n.conn.ConnectedUrl()
|
||||
}
|
||||
|
||||
if len(n.addrs) > 0 {
|
||||
return n.addrs[0]
|
||||
}
|
||||
|
||||
return ""
|
||||
return strings.Join(n.addrs, ",")
|
||||
}
|
||||
|
||||
func (n *natsBroker) setAddrs(addrs []string) []string {
|
||||
@@ -184,19 +186,66 @@ func (n *natsBroker) Options() broker.Options {
|
||||
return n.opts
|
||||
}
|
||||
|
||||
func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||
func (n *natsBroker) BatchPublish(ctx context.Context, msg []*broker.Message, opts ...broker.PublishOption) error {
|
||||
msgs := make(map[string][][]byte)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(len(msg))
|
||||
|
||||
for _, m := range msg {
|
||||
b, err := n.opts.Codec.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topic, _ := m.Header.Get(metadata.HeaderTopic)
|
||||
msgs[topic] = append(msgs[topic], b)
|
||||
}
|
||||
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
|
||||
if n.conn == nil {
|
||||
return errors.New("not connected")
|
||||
g := errgroup.Group{}
|
||||
|
||||
for topic, ms := range msgs {
|
||||
for _, m := range ms {
|
||||
g.Go(func() error {
|
||||
return n.conn.Publish(topic, m)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
b, err := n.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||
var err error
|
||||
var buf []byte
|
||||
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
|
||||
n.RLock()
|
||||
if n.conn == nil {
|
||||
n.RUnlock()
|
||||
return errors.New("not connected")
|
||||
}
|
||||
return n.conn.Publish(topic, b)
|
||||
n.RUnlock()
|
||||
|
||||
if options.BodyOnly {
|
||||
buf = msg.Body
|
||||
} else {
|
||||
buf, err = n.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
n.RLock()
|
||||
defer n.RUnlock()
|
||||
return n.conn.Publish(topic, buf)
|
||||
}
|
||||
|
||||
func (n *natsBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
|
||||
Reference in New Issue
Block a user