Add nats broker as a default
This commit is contained in:
		
							
								
								
									
										245
									
								
								broker/nats/nats.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										245
									
								
								broker/nats/nats.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,245 @@ | ||||
| // Package nats provides a NATS broker | ||||
| package nats | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/micro/go-micro/broker" | ||||
| 	"github.com/micro/go-micro/codec/json" | ||||
| 	nats "github.com/nats-io/nats.go" | ||||
| ) | ||||
|  | ||||
| type natsBroker struct { | ||||
| 	sync.RWMutex | ||||
| 	addrs []string | ||||
| 	conn  *nats.Conn | ||||
| 	opts  broker.Options | ||||
| 	nopts nats.Options | ||||
| 	drain bool | ||||
| } | ||||
|  | ||||
| type subscriber struct { | ||||
| 	s     *nats.Subscription | ||||
| 	opts  broker.SubscribeOptions | ||||
| 	drain bool | ||||
| } | ||||
|  | ||||
| type publication struct { | ||||
| 	t string | ||||
| 	m *broker.Message | ||||
| } | ||||
|  | ||||
| func (p *publication) Topic() string { | ||||
| 	return p.t | ||||
| } | ||||
|  | ||||
| func (p *publication) Message() *broker.Message { | ||||
| 	return p.m | ||||
| } | ||||
|  | ||||
| func (p *publication) Ack() error { | ||||
| 	// nats does not support acking | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *subscriber) Options() broker.SubscribeOptions { | ||||
| 	return s.opts | ||||
| } | ||||
|  | ||||
| func (s *subscriber) Topic() string { | ||||
| 	return s.s.Subject | ||||
| } | ||||
|  | ||||
| func (s *subscriber) Unsubscribe() error { | ||||
| 	if s.drain { | ||||
| 		return s.s.Drain() | ||||
| 	} | ||||
| 	return s.s.Unsubscribe() | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Address() string { | ||||
| 	if n.conn != nil && n.conn.IsConnected() { | ||||
| 		return n.conn.ConnectedUrl() | ||||
| 	} | ||||
| 	if len(n.addrs) > 0 { | ||||
| 		return n.addrs[0] | ||||
| 	} | ||||
|  | ||||
| 	return "" | ||||
| } | ||||
|  | ||||
| func setAddrs(addrs []string) []string { | ||||
| 	var cAddrs []string | ||||
| 	for _, addr := range addrs { | ||||
| 		if len(addr) == 0 { | ||||
| 			continue | ||||
| 		} | ||||
| 		if !strings.HasPrefix(addr, "nats://") { | ||||
| 			addr = "nats://" + addr | ||||
| 		} | ||||
| 		cAddrs = append(cAddrs, addr) | ||||
| 	} | ||||
| 	if len(cAddrs) == 0 { | ||||
| 		cAddrs = []string{nats.DefaultURL} | ||||
| 	} | ||||
| 	return cAddrs | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Connect() error { | ||||
| 	n.Lock() | ||||
| 	defer n.Unlock() | ||||
|  | ||||
| 	status := nats.CLOSED | ||||
| 	if n.conn != nil { | ||||
| 		status = n.conn.Status() | ||||
| 	} | ||||
|  | ||||
| 	switch status { | ||||
| 	case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING: | ||||
| 		return nil | ||||
| 	default: // DISCONNECTED or CLOSED or DRAINING | ||||
| 		opts := n.nopts | ||||
| 		opts.Servers = n.addrs | ||||
| 		opts.Secure = n.opts.Secure | ||||
| 		opts.TLSConfig = n.opts.TLSConfig | ||||
|  | ||||
| 		// secure might not be set | ||||
| 		if n.opts.TLSConfig != nil { | ||||
| 			opts.Secure = true | ||||
| 		} | ||||
|  | ||||
| 		c, err := opts.Connect() | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		n.conn = c | ||||
| 		return nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Disconnect() error { | ||||
| 	n.RLock() | ||||
| 	if n.drain { | ||||
| 		n.conn.Drain() | ||||
| 	} else { | ||||
| 		n.conn.Close() | ||||
| 	} | ||||
| 	n.RUnlock() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Init(opts ...broker.Option) error { | ||||
| 	for _, o := range opts { | ||||
| 		o(&n.opts) | ||||
| 	} | ||||
| 	n.addrs = setAddrs(n.opts.Addrs) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Options() broker.Options { | ||||
| 	return n.opts | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { | ||||
| 	b, err := n.opts.Codec.Marshal(msg) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	n.RLock() | ||||
| 	defer n.RUnlock() | ||||
| 	return n.conn.Publish(topic, b) | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { | ||||
| 	if n.conn == nil { | ||||
| 		return nil, errors.New("not connected") | ||||
| 	} | ||||
|  | ||||
| 	opt := broker.SubscribeOptions{ | ||||
| 		AutoAck: true, | ||||
| 		Context: context.Background(), | ||||
| 	} | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&opt) | ||||
| 	} | ||||
|  | ||||
| 	var drain bool | ||||
| 	if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok { | ||||
| 		drain = true | ||||
| 	} | ||||
|  | ||||
| 	fn := func(msg *nats.Msg) { | ||||
| 		var m broker.Message | ||||
| 		if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { | ||||
| 			return | ||||
| 		} | ||||
| 		handler(&publication{m: &m, t: msg.Subject}) | ||||
| 	} | ||||
|  | ||||
| 	var sub *nats.Subscription | ||||
| 	var err error | ||||
|  | ||||
| 	n.RLock() | ||||
| 	if len(opt.Queue) > 0 { | ||||
| 		sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn) | ||||
| 	} else { | ||||
| 		sub, err = n.conn.Subscribe(topic, fn) | ||||
| 	} | ||||
| 	n.RUnlock() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &subscriber{s: sub, opts: opt, drain: drain}, nil | ||||
| } | ||||
|  | ||||
| func (n *natsBroker) String() string { | ||||
| 	return "nats" | ||||
| } | ||||
|  | ||||
| func NewBroker(opts ...broker.Option) broker.Broker { | ||||
| 	options := broker.Options{ | ||||
| 		// Default codec | ||||
| 		Codec:   json.Marshaler{}, | ||||
| 		Context: context.Background(), | ||||
| 	} | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	natsOpts := nats.GetDefaultOptions() | ||||
| 	if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok { | ||||
| 		natsOpts = n | ||||
| 	} | ||||
|  | ||||
| 	var drain bool | ||||
| 	if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok { | ||||
| 		drain = true | ||||
| 	} | ||||
|  | ||||
| 	// broker.Options have higher priority than nats.Options | ||||
| 	// only if Addrs, Secure or TLSConfig were not set through a broker.Option | ||||
| 	// we read them from nats.Option | ||||
| 	if len(options.Addrs) == 0 { | ||||
| 		options.Addrs = natsOpts.Servers | ||||
| 	} | ||||
|  | ||||
| 	if !options.Secure { | ||||
| 		options.Secure = natsOpts.Secure | ||||
| 	} | ||||
|  | ||||
| 	if options.TLSConfig == nil { | ||||
| 		options.TLSConfig = natsOpts.TLSConfig | ||||
| 	} | ||||
|  | ||||
| 	return &natsBroker{ | ||||
| 		opts:  options, | ||||
| 		nopts: natsOpts, | ||||
| 		addrs: setAddrs(options.Addrs), | ||||
| 		drain: drain, | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user