2021-01-29 18:20:06 +03:00
|
|
|
// Package nats provides a NATS broker
|
2024-10-12 13:13:52 +03:00
|
|
|
package nats
|
2015-11-25 03:16:12 +03:00
|
|
|
|
|
|
|
import (
|
2021-01-29 18:20:06 +03:00
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
nats "github.com/nats-io/nats.go"
|
2021-10-27 23:01:59 +03:00
|
|
|
"go.unistack.org/micro/v3/broker"
|
|
|
|
"go.unistack.org/micro/v3/logger"
|
|
|
|
"go.unistack.org/micro/v3/metadata"
|
2021-09-10 23:55:08 +03:00
|
|
|
"golang.org/x/sync/errgroup"
|
2015-11-25 03:16:12 +03:00
|
|
|
)
|
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
var pPool = sync.Pool{
|
|
|
|
New: func() interface{} {
|
|
|
|
return &publication{msg: broker.NewMessage("")}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2021-01-29 18:20:06 +03:00
|
|
|
type natsBroker struct {
|
|
|
|
sync.Once
|
|
|
|
sync.RWMutex
|
|
|
|
|
|
|
|
// indicate if we're connected
|
|
|
|
connected bool
|
2021-03-24 23:19:12 +03:00
|
|
|
init bool
|
|
|
|
addrs []string
|
|
|
|
conn *nats.Conn
|
|
|
|
opts broker.Options
|
|
|
|
nopts nats.Options
|
2021-01-29 18:20:06 +03:00
|
|
|
|
|
|
|
// should we drain the connection
|
|
|
|
drain bool
|
|
|
|
closeCh chan (error)
|
|
|
|
}
|
|
|
|
|
|
|
|
type publication struct {
|
2021-09-23 08:11:37 +03:00
|
|
|
topic string
|
|
|
|
err error
|
|
|
|
msg *broker.Message
|
2024-10-12 13:13:52 +03:00
|
|
|
ctx context.Context
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *publication) Topic() string {
|
2021-09-23 08:11:37 +03:00
|
|
|
return p.topic
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *publication) Message() *broker.Message {
|
2021-09-23 08:11:37 +03:00
|
|
|
return p.msg
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *publication) Ack() error {
|
|
|
|
// nats does not support acking
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-09-10 23:55:08 +03:00
|
|
|
func (p *publication) SetError(err error) {
|
|
|
|
p.err = err
|
|
|
|
}
|
|
|
|
|
2021-01-29 18:20:06 +03:00
|
|
|
func (p *publication) Error() error {
|
|
|
|
return p.err
|
|
|
|
}
|
|
|
|
|
2024-10-12 13:13:52 +03:00
|
|
|
func (p *publication) Context() context.Context {
|
|
|
|
return p.ctx
|
|
|
|
}
|
|
|
|
|
2021-01-29 18:20:06 +03:00
|
|
|
type subscriber struct {
|
|
|
|
s *nats.Subscription
|
|
|
|
opts broker.SubscribeOptions
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *subscriber) Options() broker.SubscribeOptions {
|
|
|
|
return s.opts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *subscriber) Topic() string {
|
|
|
|
return s.s.Subject
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *subscriber) Unsubscribe(ctx context.Context) error {
|
|
|
|
return s.s.Unsubscribe()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Address() string {
|
|
|
|
if n.conn != nil && n.conn.IsConnected() {
|
|
|
|
return n.conn.ConnectedUrl()
|
|
|
|
}
|
|
|
|
|
2021-09-10 23:55:08 +03:00
|
|
|
return strings.Join(n.addrs, ",")
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) setAddrs(addrs []string) []string {
|
|
|
|
//nolint:prealloc
|
|
|
|
var cAddrs []string
|
|
|
|
for _, addr := range addrs {
|
|
|
|
if len(addr) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if !strings.HasPrefix(addr, "nats://") {
|
|
|
|
addr = "nats://" + addr
|
|
|
|
}
|
|
|
|
cAddrs = append(cAddrs, addr)
|
|
|
|
}
|
|
|
|
if len(cAddrs) == 0 {
|
|
|
|
cAddrs = []string{nats.DefaultURL}
|
|
|
|
}
|
|
|
|
return cAddrs
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Connect(ctx context.Context) error {
|
2022-03-05 19:07:59 +03:00
|
|
|
n.RLock()
|
2021-01-29 18:20:06 +03:00
|
|
|
if n.connected {
|
2022-03-05 19:07:59 +03:00
|
|
|
n.RUnlock()
|
2021-01-29 18:20:06 +03:00
|
|
|
return nil
|
|
|
|
}
|
2022-03-05 19:07:59 +03:00
|
|
|
n.RUnlock()
|
2021-01-29 18:20:06 +03:00
|
|
|
|
2022-03-05 19:07:59 +03:00
|
|
|
n.Lock()
|
|
|
|
defer n.Unlock()
|
2021-01-29 18:20:06 +03:00
|
|
|
status := nats.CLOSED
|
|
|
|
if n.conn != nil {
|
|
|
|
status = n.conn.Status()
|
|
|
|
}
|
|
|
|
|
|
|
|
switch status {
|
|
|
|
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
|
|
|
|
n.connected = true
|
|
|
|
return nil
|
|
|
|
default: // DISCONNECTED or CLOSED or DRAINING
|
|
|
|
opts := n.nopts
|
|
|
|
opts.Servers = n.addrs
|
|
|
|
opts.TLSConfig = n.opts.TLSConfig
|
|
|
|
|
|
|
|
c, err := opts.Connect()
|
|
|
|
if err != nil {
|
|
|
|
if n.opts.Logger.V(logger.WarnLevel) {
|
2024-10-12 13:13:52 +03:00
|
|
|
n.opts.Logger.Error(n.opts.Context, "error connecting to broker", err)
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
n.conn = c
|
|
|
|
n.connected = true
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Disconnect(ctx context.Context) error {
|
2022-03-05 19:07:59 +03:00
|
|
|
n.RLock()
|
|
|
|
if !n.connected {
|
|
|
|
n.RUnlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
n.RUnlock()
|
|
|
|
|
2021-01-29 18:20:06 +03:00
|
|
|
n.Lock()
|
|
|
|
defer n.Unlock()
|
|
|
|
// drain the connection if specified
|
|
|
|
if n.drain {
|
2022-03-05 19:07:59 +03:00
|
|
|
if err := n.conn.Drain(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-01-29 18:20:06 +03:00
|
|
|
n.closeCh <- nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// close the client connection
|
|
|
|
n.conn.Close()
|
|
|
|
|
|
|
|
// set not connected
|
|
|
|
n.connected = false
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Init(opts ...broker.Option) error {
|
2021-03-24 23:19:12 +03:00
|
|
|
if len(opts) == 0 && n.init {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := n.opts.Register.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := n.opts.Tracer.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := n.opts.Logger.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := n.opts.Meter.Init(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-01-29 18:20:06 +03:00
|
|
|
n.setOption(opts...)
|
|
|
|
if n.opts.Codec == nil {
|
|
|
|
return fmt.Errorf("codec is nil")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Options() broker.Options {
|
|
|
|
return n.opts
|
|
|
|
}
|
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
func (n *natsBroker) BatchPublish(ctx context.Context, p []*broker.Message, opts ...broker.PublishOption) error {
|
|
|
|
var err error
|
2024-02-13 19:55:55 +03:00
|
|
|
msgs := make([]*nats.Msg, 0, len(p))
|
2021-09-10 23:55:08 +03:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
wg.Add(len(p))
|
2021-09-10 23:55:08 +03:00
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
options := broker.NewPublishOptions(opts...)
|
|
|
|
|
|
|
|
for _, m := range p {
|
|
|
|
rec := &nats.Msg{}
|
|
|
|
rec.Subject, _ = m.Header.Get(metadata.HeaderTopic)
|
|
|
|
if options.BodyOnly {
|
|
|
|
rec.Data = m.Body
|
|
|
|
} else if n.opts.Codec.String() == "noop" {
|
|
|
|
rec.Data = m.Body
|
|
|
|
rec.Header = make(nats.Header, len(m.Header))
|
|
|
|
for k, v := range m.Header {
|
|
|
|
rec.Header.Add(k, v)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
rec.Data, err = n.opts.Codec.Marshal(m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-09-10 23:55:08 +03:00
|
|
|
}
|
2021-09-23 08:11:37 +03:00
|
|
|
msgs = append(msgs, rec)
|
2021-09-10 23:55:08 +03:00
|
|
|
}
|
|
|
|
|
2021-01-29 18:20:06 +03:00
|
|
|
n.RLock()
|
|
|
|
defer n.RUnlock()
|
|
|
|
|
2021-09-10 23:55:08 +03:00
|
|
|
g := errgroup.Group{}
|
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
for _, ms := range msgs {
|
|
|
|
m := ms
|
|
|
|
g.Go(func() error {
|
|
|
|
return n.conn.PublishMsg(m)
|
|
|
|
})
|
2021-09-10 23:55:08 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
return g.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
n.RLock()
|
2021-01-29 18:20:06 +03:00
|
|
|
if n.conn == nil {
|
2021-09-10 23:55:08 +03:00
|
|
|
n.RUnlock()
|
2021-01-29 18:20:06 +03:00
|
|
|
return errors.New("not connected")
|
|
|
|
}
|
2021-09-10 23:55:08 +03:00
|
|
|
n.RUnlock()
|
2021-01-29 18:20:06 +03:00
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
options := broker.NewPublishOptions(opts...)
|
|
|
|
|
|
|
|
rec := &nats.Msg{}
|
|
|
|
rec.Subject, _ = msg.Header.Get(metadata.HeaderTopic)
|
2021-09-10 23:55:08 +03:00
|
|
|
if options.BodyOnly {
|
2021-09-23 08:11:37 +03:00
|
|
|
rec.Data = msg.Body
|
|
|
|
} else if n.opts.Codec.String() == "noop" {
|
|
|
|
rec.Data = msg.Body
|
|
|
|
rec.Header = make(nats.Header, len(msg.Header))
|
|
|
|
for k, v := range msg.Header {
|
|
|
|
rec.Header.Add(k, v)
|
|
|
|
}
|
2021-09-10 23:55:08 +03:00
|
|
|
} else {
|
2021-09-23 08:11:37 +03:00
|
|
|
rec.Data, err = n.opts.Codec.Marshal(msg)
|
2021-09-10 23:55:08 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
2021-09-10 23:55:08 +03:00
|
|
|
|
|
|
|
n.RLock()
|
|
|
|
defer n.RUnlock()
|
2021-09-23 08:11:37 +03:00
|
|
|
|
|
|
|
return n.conn.PublishMsg(rec)
|
2021-09-10 23:55:08 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
|
|
return nil, nil
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
|
|
n.RLock()
|
|
|
|
if n.conn == nil {
|
|
|
|
n.RUnlock()
|
|
|
|
return nil, errors.New("not connected")
|
|
|
|
}
|
|
|
|
n.RUnlock()
|
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
options := broker.NewSubscribeOptions(opts...)
|
2021-01-29 18:20:06 +03:00
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
eh := n.opts.ErrorHandler
|
|
|
|
if options.ErrorHandler != nil {
|
|
|
|
eh = options.ErrorHandler
|
|
|
|
}
|
2021-01-29 18:20:06 +03:00
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
fn := func(msg *nats.Msg) {
|
|
|
|
pub := pPool.Get().(*publication)
|
|
|
|
pub.msg.Header = nil
|
|
|
|
pub.msg.Body = nil
|
|
|
|
pub.topic = msg.Subject
|
|
|
|
pub.err = nil
|
2024-10-12 13:13:52 +03:00
|
|
|
pub.ctx = ctx
|
2021-09-23 08:11:37 +03:00
|
|
|
|
|
|
|
if options.BodyOnly {
|
|
|
|
pub.msg.Body = msg.Data
|
|
|
|
} else if n.opts.Codec.String() == "noop" {
|
|
|
|
pub.msg.Body = msg.Data
|
|
|
|
pub.msg.Header = metadata.New(len(msg.Header))
|
|
|
|
for k, v := range msg.Header {
|
|
|
|
pub.msg.Header.Set(k, strings.Join(v, ","))
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
err := n.opts.Codec.Unmarshal(msg.Data, pub.msg)
|
|
|
|
pub.err = err
|
|
|
|
if err != nil {
|
|
|
|
pub.msg.Body = msg.Data
|
|
|
|
if eh != nil {
|
|
|
|
eh(pub)
|
|
|
|
} else {
|
|
|
|
if n.opts.Logger.V(logger.ErrorLevel) {
|
2024-10-12 13:13:52 +03:00
|
|
|
n.opts.Logger.Error(n.opts.Context, "handler error", err)
|
2021-09-23 08:11:37 +03:00
|
|
|
}
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
2021-09-23 08:11:37 +03:00
|
|
|
return
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := handler(pub); err != nil {
|
|
|
|
pub.err = err
|
|
|
|
if eh != nil {
|
|
|
|
eh(pub)
|
|
|
|
} else {
|
|
|
|
if n.opts.Logger.V(logger.ErrorLevel) {
|
2024-10-12 13:13:52 +03:00
|
|
|
n.opts.Logger.Error(n.opts.Context, "handler error", err)
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var sub *nats.Subscription
|
|
|
|
var err error
|
|
|
|
|
|
|
|
n.RLock()
|
2021-09-23 08:11:37 +03:00
|
|
|
if len(options.Group) > 0 {
|
|
|
|
sub, err = n.conn.QueueSubscribe(topic, options.Group, fn)
|
2021-01-29 18:20:06 +03:00
|
|
|
} else {
|
|
|
|
sub, err = n.conn.Subscribe(topic, fn)
|
|
|
|
}
|
|
|
|
n.RUnlock()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-09-23 08:11:37 +03:00
|
|
|
return &subscriber{s: sub, opts: options}, nil
|
2021-01-29 18:20:06 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) String() string {
|
|
|
|
return "nats"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) Name() string {
|
|
|
|
return n.opts.Name
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) setOption(opts ...broker.Option) {
|
|
|
|
for _, o := range opts {
|
|
|
|
o(&n.opts)
|
|
|
|
}
|
|
|
|
|
|
|
|
n.Once.Do(func() {
|
|
|
|
n.nopts = nats.GetDefaultOptions()
|
|
|
|
})
|
|
|
|
|
|
|
|
if nopts, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
|
|
|
|
n.nopts = nopts
|
|
|
|
}
|
|
|
|
|
|
|
|
// broker.Options have higher priority than nats.Options
|
|
|
|
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
|
|
|
|
// we read them from nats.Option
|
|
|
|
if len(n.opts.Addrs) == 0 {
|
|
|
|
n.opts.Addrs = n.nopts.Servers
|
|
|
|
}
|
|
|
|
|
|
|
|
if n.opts.TLSConfig == nil {
|
|
|
|
n.opts.TLSConfig = n.nopts.TLSConfig
|
|
|
|
}
|
|
|
|
n.addrs = n.setAddrs(n.opts.Addrs)
|
|
|
|
|
|
|
|
if n.opts.Context.Value(drainConnectionKey{}) != nil {
|
|
|
|
n.drain = true
|
|
|
|
n.closeCh = make(chan error)
|
|
|
|
n.nopts.ClosedCB = n.onClose
|
|
|
|
n.nopts.AsyncErrorCB = n.onAsyncError
|
|
|
|
n.nopts.DisconnectedErrCB = n.onDisconnectedError
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) onClose(conn *nats.Conn) {
|
|
|
|
n.closeCh <- nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
|
|
|
|
// There are kinds of different async error nats might callback, but we are interested
|
|
|
|
// in ErrDrainTimeout only here.
|
|
|
|
if err == nats.ErrDrainTimeout {
|
|
|
|
n.closeCh <- err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (n *natsBroker) onDisconnectedError(conn *nats.Conn, err error) {
|
|
|
|
n.closeCh <- err
|
2015-11-25 03:16:12 +03:00
|
|
|
}
|
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
func NewBroker(opts ...broker.Option) *natsBroker {
|
2021-01-29 18:20:06 +03:00
|
|
|
options := broker.NewOptions(opts...)
|
|
|
|
|
2021-09-23 08:11:37 +03:00
|
|
|
if options.Codec.String() != "noop" {
|
2024-10-12 13:13:52 +03:00
|
|
|
options.Logger.Info(options.Context, "broker codec not noop, disable plain nats headers usage")
|
2021-09-23 08:11:37 +03:00
|
|
|
}
|
|
|
|
|
2021-01-29 18:20:06 +03:00
|
|
|
n := &natsBroker{
|
|
|
|
opts: options,
|
|
|
|
}
|
|
|
|
|
|
|
|
n.setOption(opts...)
|
|
|
|
|
|
|
|
return n
|
2015-11-25 03:16:12 +03:00
|
|
|
}
|