0baea58938
1. nats subscription draining is removed, since it is asynchronous, and there is no reliable way to detect when it is finished. Remove this option to avoid confusion. 2. nats connection draining is kept, and use 2 callbacks to detect draining timeout (timeout is set via `nats.Options`) or finish. 3. Also honour options passed in `broker.Init()` (previously only `broker.New()` is honoured).
256 lines
4.7 KiB
Go
256 lines
4.7 KiB
Go
// Package nats provides a NATS broker
|
|
package nats
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/micro/go-micro/broker"
|
|
"github.com/micro/go-micro/codec/json"
|
|
nats "github.com/nats-io/nats.go"
|
|
)
|
|
|
|
type natsBroker struct {
|
|
sync.Once
|
|
sync.RWMutex
|
|
addrs []string
|
|
conn *nats.Conn
|
|
opts broker.Options
|
|
nopts nats.Options
|
|
drain bool
|
|
closeCh chan (error)
|
|
}
|
|
|
|
type subscriber struct {
|
|
s *nats.Subscription
|
|
opts broker.SubscribeOptions
|
|
}
|
|
|
|
type publication struct {
|
|
t string
|
|
m *broker.Message
|
|
}
|
|
|
|
func (p *publication) Topic() string {
|
|
return p.t
|
|
}
|
|
|
|
func (p *publication) Message() *broker.Message {
|
|
return p.m
|
|
}
|
|
|
|
func (p *publication) Ack() error {
|
|
// nats does not support acking
|
|
return nil
|
|
}
|
|
|
|
func (s *subscriber) Options() broker.SubscribeOptions {
|
|
return s.opts
|
|
}
|
|
|
|
func (s *subscriber) Topic() string {
|
|
return s.s.Subject
|
|
}
|
|
|
|
func (s *subscriber) Unsubscribe() error {
|
|
return s.s.Unsubscribe()
|
|
}
|
|
|
|
func (n *natsBroker) Address() string {
|
|
if n.conn != nil && n.conn.IsConnected() {
|
|
return n.conn.ConnectedUrl()
|
|
}
|
|
if len(n.addrs) > 0 {
|
|
return n.addrs[0]
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func setAddrs(addrs []string) []string {
|
|
var cAddrs []string
|
|
for _, addr := range addrs {
|
|
if len(addr) == 0 {
|
|
continue
|
|
}
|
|
if !strings.HasPrefix(addr, "nats://") {
|
|
addr = "nats://" + addr
|
|
}
|
|
cAddrs = append(cAddrs, addr)
|
|
}
|
|
if len(cAddrs) == 0 {
|
|
cAddrs = []string{nats.DefaultURL}
|
|
}
|
|
return cAddrs
|
|
}
|
|
|
|
func (n *natsBroker) Connect() error {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
status := nats.CLOSED
|
|
if n.conn != nil {
|
|
status = n.conn.Status()
|
|
}
|
|
|
|
switch status {
|
|
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
|
|
return nil
|
|
default: // DISCONNECTED or CLOSED or DRAINING
|
|
opts := n.nopts
|
|
opts.Servers = n.addrs
|
|
opts.Secure = n.opts.Secure
|
|
opts.TLSConfig = n.opts.TLSConfig
|
|
|
|
// secure might not be set
|
|
if n.opts.TLSConfig != nil {
|
|
opts.Secure = true
|
|
}
|
|
|
|
c, err := opts.Connect()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n.conn = c
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (n *natsBroker) Disconnect() error {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
if n.drain {
|
|
n.conn.Drain()
|
|
return <-n.closeCh
|
|
}
|
|
n.conn.Close()
|
|
return nil
|
|
}
|
|
|
|
func (n *natsBroker) Init(opts ...broker.Option) error {
|
|
n.setOption(opts...)
|
|
return nil
|
|
}
|
|
|
|
func (n *natsBroker) Options() broker.Options {
|
|
return n.opts
|
|
}
|
|
|
|
func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
|
b, err := n.opts.Codec.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.conn.Publish(topic, b)
|
|
}
|
|
|
|
func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
if n.conn == nil {
|
|
return nil, errors.New("not connected")
|
|
}
|
|
|
|
opt := broker.SubscribeOptions{
|
|
AutoAck: true,
|
|
Context: context.Background(),
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&opt)
|
|
}
|
|
|
|
fn := func(msg *nats.Msg) {
|
|
var m broker.Message
|
|
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
|
|
return
|
|
}
|
|
handler(&publication{m: &m, t: msg.Subject})
|
|
}
|
|
|
|
var sub *nats.Subscription
|
|
var err error
|
|
|
|
n.RLock()
|
|
if len(opt.Queue) > 0 {
|
|
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
|
|
} else {
|
|
sub, err = n.conn.Subscribe(topic, fn)
|
|
}
|
|
n.RUnlock()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &subscriber{s: sub, opts: opt}, nil
|
|
}
|
|
|
|
func (n *natsBroker) String() string {
|
|
return "nats"
|
|
}
|
|
|
|
func NewBroker(opts ...broker.Option) broker.Broker {
|
|
options := broker.Options{
|
|
// Default codec
|
|
Codec: json.Marshaler{},
|
|
Context: context.Background(),
|
|
}
|
|
|
|
n := &natsBroker{
|
|
opts: options,
|
|
}
|
|
n.setOption(opts...)
|
|
|
|
return n
|
|
}
|
|
|
|
func (n *natsBroker) setOption(opts ...broker.Option) {
|
|
for _, o := range opts {
|
|
o(&n.opts)
|
|
}
|
|
|
|
n.Once.Do(func() {
|
|
n.nopts = nats.GetDefaultOptions()
|
|
})
|
|
|
|
if nopts, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
|
|
n.nopts = nopts
|
|
}
|
|
|
|
// broker.Options have higher priority than nats.Options
|
|
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
|
|
// we read them from nats.Option
|
|
if len(n.opts.Addrs) == 0 {
|
|
n.opts.Addrs = n.nopts.Servers
|
|
}
|
|
|
|
if !n.opts.Secure {
|
|
n.opts.Secure = n.nopts.Secure
|
|
}
|
|
|
|
if n.opts.TLSConfig == nil {
|
|
n.opts.TLSConfig = n.nopts.TLSConfig
|
|
}
|
|
n.addrs = setAddrs(n.opts.Addrs)
|
|
|
|
if n.opts.Context.Value(drainConnectionKey{}) != nil {
|
|
n.drain = true
|
|
n.closeCh = make(chan error)
|
|
n.nopts.ClosedCB = n.onClose
|
|
n.nopts.AsyncErrorCB = n.onAsyncError
|
|
}
|
|
}
|
|
|
|
func (n *natsBroker) onClose(conn *nats.Conn) {
|
|
n.closeCh <- nil
|
|
}
|
|
|
|
func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
|
|
// There are kinds of different async error nats might callback, but we are interested
|
|
// in ErrDrainTimeout only here.
|
|
if err == nats.ErrDrainTimeout {
|
|
n.closeCh <- err
|
|
}
|
|
}
|