246 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			246 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package nats provides a NATS broker
 | |
| package nats
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/micro/go-micro/broker"
 | |
| 	"github.com/micro/go-micro/codec/json"
 | |
| 	nats "github.com/nats-io/nats.go"
 | |
| )
 | |
| 
 | |
| type natsBroker struct {
 | |
| 	sync.RWMutex
 | |
| 	addrs []string
 | |
| 	conn  *nats.Conn
 | |
| 	opts  broker.Options
 | |
| 	nopts nats.Options
 | |
| 	drain bool
 | |
| }
 | |
| 
 | |
| type subscriber struct {
 | |
| 	s     *nats.Subscription
 | |
| 	opts  broker.SubscribeOptions
 | |
| 	drain bool
 | |
| }
 | |
| 
 | |
| type publication struct {
 | |
| 	t string
 | |
| 	m *broker.Message
 | |
| }
 | |
| 
 | |
| func (p *publication) Topic() string {
 | |
| 	return p.t
 | |
| }
 | |
| 
 | |
| func (p *publication) Message() *broker.Message {
 | |
| 	return p.m
 | |
| }
 | |
| 
 | |
| func (p *publication) Ack() error {
 | |
| 	// nats does not support acking
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *subscriber) Options() broker.SubscribeOptions {
 | |
| 	return s.opts
 | |
| }
 | |
| 
 | |
| func (s *subscriber) Topic() string {
 | |
| 	return s.s.Subject
 | |
| }
 | |
| 
 | |
| func (s *subscriber) Unsubscribe() error {
 | |
| 	if s.drain {
 | |
| 		return s.s.Drain()
 | |
| 	}
 | |
| 	return s.s.Unsubscribe()
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) Address() string {
 | |
| 	if n.conn != nil && n.conn.IsConnected() {
 | |
| 		return n.conn.ConnectedUrl()
 | |
| 	}
 | |
| 	if len(n.addrs) > 0 {
 | |
| 		return n.addrs[0]
 | |
| 	}
 | |
| 
 | |
| 	return ""
 | |
| }
 | |
| 
 | |
| func setAddrs(addrs []string) []string {
 | |
| 	var cAddrs []string
 | |
| 	for _, addr := range addrs {
 | |
| 		if len(addr) == 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 		if !strings.HasPrefix(addr, "nats://") {
 | |
| 			addr = "nats://" + addr
 | |
| 		}
 | |
| 		cAddrs = append(cAddrs, addr)
 | |
| 	}
 | |
| 	if len(cAddrs) == 0 {
 | |
| 		cAddrs = []string{nats.DefaultURL}
 | |
| 	}
 | |
| 	return cAddrs
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) Connect() error {
 | |
| 	n.Lock()
 | |
| 	defer n.Unlock()
 | |
| 
 | |
| 	status := nats.CLOSED
 | |
| 	if n.conn != nil {
 | |
| 		status = n.conn.Status()
 | |
| 	}
 | |
| 
 | |
| 	switch status {
 | |
| 	case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
 | |
| 		return nil
 | |
| 	default: // DISCONNECTED or CLOSED or DRAINING
 | |
| 		opts := n.nopts
 | |
| 		opts.Servers = n.addrs
 | |
| 		opts.Secure = n.opts.Secure
 | |
| 		opts.TLSConfig = n.opts.TLSConfig
 | |
| 
 | |
| 		// secure might not be set
 | |
| 		if n.opts.TLSConfig != nil {
 | |
| 			opts.Secure = true
 | |
| 		}
 | |
| 
 | |
| 		c, err := opts.Connect()
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		n.conn = c
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) Disconnect() error {
 | |
| 	n.RLock()
 | |
| 	if n.drain {
 | |
| 		n.conn.Drain()
 | |
| 	} else {
 | |
| 		n.conn.Close()
 | |
| 	}
 | |
| 	n.RUnlock()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) Init(opts ...broker.Option) error {
 | |
| 	for _, o := range opts {
 | |
| 		o(&n.opts)
 | |
| 	}
 | |
| 	n.addrs = setAddrs(n.opts.Addrs)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) Options() broker.Options {
 | |
| 	return n.opts
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
 | |
| 	b, err := n.opts.Codec.Marshal(msg)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	n.RLock()
 | |
| 	defer n.RUnlock()
 | |
| 	return n.conn.Publish(topic, b)
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | |
| 	if n.conn == nil {
 | |
| 		return nil, errors.New("not connected")
 | |
| 	}
 | |
| 
 | |
| 	opt := broker.SubscribeOptions{
 | |
| 		AutoAck: true,
 | |
| 		Context: context.Background(),
 | |
| 	}
 | |
| 
 | |
| 	for _, o := range opts {
 | |
| 		o(&opt)
 | |
| 	}
 | |
| 
 | |
| 	var drain bool
 | |
| 	if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok {
 | |
| 		drain = true
 | |
| 	}
 | |
| 
 | |
| 	fn := func(msg *nats.Msg) {
 | |
| 		var m broker.Message
 | |
| 		if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		handler(&publication{m: &m, t: msg.Subject})
 | |
| 	}
 | |
| 
 | |
| 	var sub *nats.Subscription
 | |
| 	var err error
 | |
| 
 | |
| 	n.RLock()
 | |
| 	if len(opt.Queue) > 0 {
 | |
| 		sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
 | |
| 	} else {
 | |
| 		sub, err = n.conn.Subscribe(topic, fn)
 | |
| 	}
 | |
| 	n.RUnlock()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &subscriber{s: sub, opts: opt, drain: drain}, nil
 | |
| }
 | |
| 
 | |
| func (n *natsBroker) String() string {
 | |
| 	return "nats"
 | |
| }
 | |
| 
 | |
| func NewBroker(opts ...broker.Option) broker.Broker {
 | |
| 	options := broker.Options{
 | |
| 		// Default codec
 | |
| 		Codec:   json.Marshaler{},
 | |
| 		Context: context.Background(),
 | |
| 	}
 | |
| 
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	natsOpts := nats.GetDefaultOptions()
 | |
| 	if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok {
 | |
| 		natsOpts = n
 | |
| 	}
 | |
| 
 | |
| 	var drain bool
 | |
| 	if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok {
 | |
| 		drain = true
 | |
| 	}
 | |
| 
 | |
| 	// broker.Options have higher priority than nats.Options
 | |
| 	// only if Addrs, Secure or TLSConfig were not set through a broker.Option
 | |
| 	// we read them from nats.Option
 | |
| 	if len(options.Addrs) == 0 {
 | |
| 		options.Addrs = natsOpts.Servers
 | |
| 	}
 | |
| 
 | |
| 	if !options.Secure {
 | |
| 		options.Secure = natsOpts.Secure
 | |
| 	}
 | |
| 
 | |
| 	if options.TLSConfig == nil {
 | |
| 		options.TLSConfig = natsOpts.TLSConfig
 | |
| 	}
 | |
| 
 | |
| 	return &natsBroker{
 | |
| 		opts:  options,
 | |
| 		nopts: natsOpts,
 | |
| 		addrs: setAddrs(options.Addrs),
 | |
| 		drain: drain,
 | |
| 	}
 | |
| }
 |