micro/broker/nats/nats.go
magodo 0baea58938 wait nats drain since it's asynchronous
1. nats subscription draining is removed, since it is asynchronous,
   and there is no reliable way to detect when it is finished.
   Remove this option to avoid confusion.
2. nats connection draining is kept, and use 2 callbacks to detect
   draining timeout (timeout is set via `nats.Options`) or finish.
3. Also honour options passed in `broker.Init()` (previously only
   `broker.New()` is honoured).
2019-08-07 17:58:45 +08:00

256 lines
4.7 KiB
Go

// Package nats provides a NATS broker
package nats
import (
"context"
"errors"
"strings"
"sync"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec/json"
nats "github.com/nats-io/nats.go"
)
type natsBroker struct {
sync.Once
sync.RWMutex
addrs []string
conn *nats.Conn
opts broker.Options
nopts nats.Options
drain bool
closeCh chan (error)
}
type subscriber struct {
s *nats.Subscription
opts broker.SubscribeOptions
}
type publication struct {
t string
m *broker.Message
}
func (p *publication) Topic() string {
return p.t
}
func (p *publication) Message() *broker.Message {
return p.m
}
func (p *publication) Ack() error {
// nats does not support acking
return nil
}
func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts
}
func (s *subscriber) Topic() string {
return s.s.Subject
}
func (s *subscriber) Unsubscribe() error {
return s.s.Unsubscribe()
}
func (n *natsBroker) Address() string {
if n.conn != nil && n.conn.IsConnected() {
return n.conn.ConnectedUrl()
}
if len(n.addrs) > 0 {
return n.addrs[0]
}
return ""
}
func setAddrs(addrs []string) []string {
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
if !strings.HasPrefix(addr, "nats://") {
addr = "nats://" + addr
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{nats.DefaultURL}
}
return cAddrs
}
func (n *natsBroker) Connect() error {
n.Lock()
defer n.Unlock()
status := nats.CLOSED
if n.conn != nil {
status = n.conn.Status()
}
switch status {
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
return nil
default: // DISCONNECTED or CLOSED or DRAINING
opts := n.nopts
opts.Servers = n.addrs
opts.Secure = n.opts.Secure
opts.TLSConfig = n.opts.TLSConfig
// secure might not be set
if n.opts.TLSConfig != nil {
opts.Secure = true
}
c, err := opts.Connect()
if err != nil {
return err
}
n.conn = c
return nil
}
}
func (n *natsBroker) Disconnect() error {
n.RLock()
defer n.RUnlock()
if n.drain {
n.conn.Drain()
return <-n.closeCh
}
n.conn.Close()
return nil
}
func (n *natsBroker) Init(opts ...broker.Option) error {
n.setOption(opts...)
return nil
}
func (n *natsBroker) Options() broker.Options {
return n.opts
}
func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
b, err := n.opts.Codec.Marshal(msg)
if err != nil {
return err
}
n.RLock()
defer n.RUnlock()
return n.conn.Publish(topic, b)
}
func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
if n.conn == nil {
return nil, errors.New("not connected")
}
opt := broker.SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
fn := func(msg *nats.Msg) {
var m broker.Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
return
}
handler(&publication{m: &m, t: msg.Subject})
}
var sub *nats.Subscription
var err error
n.RLock()
if len(opt.Queue) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
n.RUnlock()
if err != nil {
return nil, err
}
return &subscriber{s: sub, opts: opt}, nil
}
func (n *natsBroker) String() string {
return "nats"
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// Default codec
Codec: json.Marshaler{},
Context: context.Background(),
}
n := &natsBroker{
opts: options,
}
n.setOption(opts...)
return n
}
func (n *natsBroker) setOption(opts ...broker.Option) {
for _, o := range opts {
o(&n.opts)
}
n.Once.Do(func() {
n.nopts = nats.GetDefaultOptions()
})
if nopts, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
n.nopts = nopts
}
// broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option
if len(n.opts.Addrs) == 0 {
n.opts.Addrs = n.nopts.Servers
}
if !n.opts.Secure {
n.opts.Secure = n.nopts.Secure
}
if n.opts.TLSConfig == nil {
n.opts.TLSConfig = n.nopts.TLSConfig
}
n.addrs = setAddrs(n.opts.Addrs)
if n.opts.Context.Value(drainConnectionKey{}) != nil {
n.drain = true
n.closeCh = make(chan error)
n.nopts.ClosedCB = n.onClose
n.nopts.AsyncErrorCB = n.onAsyncError
}
}
func (n *natsBroker) onClose(conn *nats.Conn) {
n.closeCh <- nil
}
func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
// There are kinds of different async error nats might callback, but we are interested
// in ErrDrainTimeout only here.
if err == nats.ErrDrainTimeout {
n.closeCh <- err
}
}