252 lines
4.5 KiB
Go
Raw Normal View History

2017-01-01 20:29:12 +00:00
// Package nats provides a NATS broker
2015-11-25 00:16:12 +00:00
package nats
import (
2018-03-03 12:28:44 +00:00
"context"
"errors"
2015-11-25 00:16:12 +00:00
"strings"
2018-11-05 06:39:28 +00:00
"sync"
2015-11-25 00:16:12 +00:00
"github.com/micro/go-micro/broker"
2019-06-21 13:50:38 +01:00
"github.com/micro/go-micro/config/cmd"
2019-01-10 09:43:37 +00:00
"github.com/micro/go-micro/codec/json"
nats "github.com/nats-io/nats.go"
2015-11-25 00:16:12 +00:00
)
type nbroker struct {
2018-11-05 06:39:28 +00:00
sync.RWMutex
2015-11-25 00:16:12 +00:00
addrs []string
conn *nats.Conn
2015-12-31 18:23:15 +00:00
opts broker.Options
2018-01-05 20:18:00 +01:00
nopts nats.Options
drain bool
2015-11-25 00:16:12 +00:00
}
type subscriber struct {
s *nats.Subscription
opts broker.SubscribeOptions
drain bool
}
type publication struct {
t string
m *broker.Message
2015-11-25 00:16:12 +00:00
}
func init() {
2016-01-02 19:25:20 +00:00
cmd.DefaultBrokers["nats"] = NewBroker
2015-11-25 00:16:12 +00:00
}
func (n *publication) Topic() string {
return n.t
}
func (n *publication) Message() *broker.Message {
return n.m
}
func (n *publication) Ack() error {
return nil
}
2015-12-31 18:23:15 +00:00
func (n *subscriber) Options() broker.SubscribeOptions {
return n.opts
}
2015-11-25 00:16:12 +00:00
func (n *subscriber) Topic() string {
return n.s.Subject
}
func (n *subscriber) Unsubscribe() error {
if n.drain {
return n.s.Drain()
}
return n.s.Unsubscribe()
2015-11-25 00:16:12 +00:00
}
func (n *nbroker) Address() string {
if n.conn != nil && n.conn.IsConnected() {
return n.conn.ConnectedUrl()
}
2015-11-25 00:16:12 +00:00
if len(n.addrs) > 0 {
return n.addrs[0]
}
2015-11-25 00:16:12 +00:00
return ""
}
func setAddrs(addrs []string) []string {
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
if !strings.HasPrefix(addr, "nats://") {
addr = "nats://" + addr
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{nats.DefaultURL}
}
return cAddrs
}
2015-11-25 00:16:12 +00:00
func (n *nbroker) Connect() error {
2019-03-13 19:58:19 +08:00
n.Lock()
defer n.Unlock()
2016-01-16 23:11:20 +00:00
2019-03-13 19:58:19 +08:00
status := nats.CLOSED
if n.conn != nil {
status = n.conn.Status()
2016-01-17 00:37:55 +00:00
}
2019-03-13 19:58:19 +08:00
switch status {
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
return nil
default: // DISCONNECTED or CLOSED or DRAINING
opts := n.nopts
opts.Servers = n.addrs
opts.Secure = n.opts.Secure
opts.TLSConfig = n.opts.TLSConfig
// secure might not be set
if n.opts.TLSConfig != nil {
opts.Secure = true
}
c, err := opts.Connect()
if err != nil {
return err
}
n.conn = c
return nil
2015-11-25 00:16:12 +00:00
}
}
func (n *nbroker) Disconnect() error {
2018-11-05 06:39:28 +00:00
n.RLock()
if n.drain {
n.conn.Drain()
} else {
n.conn.Close()
}
2018-11-05 06:39:28 +00:00
n.RUnlock()
2015-11-25 00:16:12 +00:00
return nil
}
func (n *nbroker) Init(opts ...broker.Option) error {
2016-01-16 23:11:20 +00:00
for _, o := range opts {
o(&n.opts)
}
n.addrs = setAddrs(n.opts.Addrs)
2015-11-25 00:16:12 +00:00
return nil
}
2015-12-31 18:23:15 +00:00
func (n *nbroker) Options() broker.Options {
return n.opts
}
func (n *nbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
b, err := n.opts.Codec.Marshal(msg)
2015-11-25 00:16:12 +00:00
if err != nil {
return err
}
2018-11-05 06:39:28 +00:00
n.RLock()
defer n.RUnlock()
2015-11-25 00:16:12 +00:00
return n.conn.Publish(topic, b)
}
func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
if n.conn == nil {
return nil, errors.New("not connected")
}
opt := broker.SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
var drain bool
if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok {
drain = true
}
fn := func(msg *nats.Msg) {
2016-12-06 20:35:00 +00:00
var m broker.Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
2015-11-25 00:16:12 +00:00
return
}
2017-01-12 20:21:49 +00:00
handler(&publication{m: &m, t: msg.Subject})
}
var sub *nats.Subscription
var err error
2018-11-05 06:39:28 +00:00
n.RLock()
if len(opt.Queue) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
2018-11-05 06:39:28 +00:00
n.RUnlock()
2015-11-25 00:16:12 +00:00
if err != nil {
return nil, err
}
return &subscriber{s: sub, opts: opt, drain: drain}, nil
2015-11-25 00:16:12 +00:00
}
2015-12-19 21:56:42 +00:00
func (n *nbroker) String() string {
return "nats"
}
2016-03-16 10:08:10 +00:00
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// Default codec
2019-01-10 09:42:14 +00:00
Codec: json.Marshaler{},
Context: context.Background(),
}
2016-03-16 10:08:10 +00:00
for _, o := range opts {
o(&options)
}
2018-01-06 15:19:15 +00:00
natsOpts := nats.GetDefaultOptions()
if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok {
2018-01-06 15:19:15 +00:00
natsOpts = n
}
var drain bool
if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok {
drain = true
}
2018-01-05 20:18:00 +01:00
// broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option
if len(options.Addrs) == 0 {
2018-01-06 15:19:15 +00:00
options.Addrs = natsOpts.Servers
2018-01-05 20:18:00 +01:00
}
if !options.Secure {
2018-01-06 15:19:15 +00:00
options.Secure = natsOpts.Secure
2018-01-05 20:18:00 +01:00
}
if options.TLSConfig == nil {
2018-01-06 15:19:15 +00:00
options.TLSConfig = natsOpts.TLSConfig
2018-01-05 20:18:00 +01:00
}
nb := &nbroker{
2016-05-16 11:57:37 +01:00
opts: options,
2018-01-06 15:19:15 +00:00
nopts: natsOpts,
2018-01-05 20:18:00 +01:00
addrs: setAddrs(options.Addrs),
drain: drain,
2015-11-25 00:16:12 +00:00
}
2018-01-05 20:18:00 +01:00
return nb
2015-11-25 00:16:12 +00:00
}