micro-broker-nats/nats.go

157 lines
2.6 KiB
Go
Raw Normal View History

2015-11-25 03:16:12 +03:00
package nats
import (
"encoding/json"
"strings"
"github.com/apcera/nats"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/cmd"
)
type nbroker struct {
addrs []string
conn *nats.Conn
2015-12-31 21:23:15 +03:00
opts broker.Options
2015-11-25 03:16:12 +03:00
}
type subscriber struct {
s *nats.Subscription
opts broker.SubscribeOptions
}
type publication struct {
t string
m *broker.Message
2015-11-25 03:16:12 +03:00
}
func init() {
2016-01-02 22:25:20 +03:00
cmd.DefaultBrokers["nats"] = NewBroker
2015-11-25 03:16:12 +03:00
}
func (n *publication) Topic() string {
return n.t
}
func (n *publication) Message() *broker.Message {
return n.m
}
func (n *publication) Ack() error {
return nil
}
2015-12-31 21:23:15 +03:00
func (n *subscriber) Options() broker.SubscribeOptions {
return n.opts
}
2015-11-25 03:16:12 +03:00
func (n *subscriber) Topic() string {
return n.s.Subject
}
func (n *subscriber) Unsubscribe() error {
return n.s.Unsubscribe()
}
func (n *nbroker) Address() string {
if len(n.addrs) > 0 {
return n.addrs[0]
}
return ""
}
func (n *nbroker) Connect() error {
if n.conn != nil {
return nil
}
opts := nats.DefaultOptions
opts.Servers = n.addrs
2016-01-17 02:11:20 +03:00
opts.Secure = n.opts.Secure
2015-11-25 03:16:12 +03:00
c, err := opts.Connect()
if err != nil {
return err
}
n.conn = c
return nil
}
func (n *nbroker) Disconnect() error {
n.conn.Close()
return nil
}
func (n *nbroker) Init(opts ...broker.Option) error {
2016-01-17 02:11:20 +03:00
for _, o := range opts {
o(&n.opts)
}
2015-11-25 03:16:12 +03:00
return nil
}
2015-12-31 21:23:15 +03:00
func (n *nbroker) Options() broker.Options {
return n.opts
}
func (n *nbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
2015-11-25 03:16:12 +03:00
b, err := json.Marshal(msg)
if err != nil {
return err
}
return n.conn.Publish(topic, b)
}
func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
opt := broker.SubscribeOptions{
AutoAck: true,
}
for _, o := range opts {
o(&opt)
}
fn := func(msg *nats.Msg) {
2015-11-25 03:16:12 +03:00
var m *broker.Message
if err := json.Unmarshal(msg.Data, &m); err != nil {
return
}
handler(&publication{m: m, t: topic})
}
var sub *nats.Subscription
var err error
if len(opt.Queue) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
2015-11-25 03:16:12 +03:00
if err != nil {
return nil, err
}
return &subscriber{s: sub, opts: opt}, nil
2015-11-25 03:16:12 +03:00
}
2015-12-20 00:56:42 +03:00
func (n *nbroker) String() string {
return "nats"
}
2015-11-25 03:16:12 +03:00
func NewBroker(addrs []string, opt ...broker.Option) broker.Broker {
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
if !strings.HasPrefix(addr, "nats://") {
addr = "nats://" + addr
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{nats.DefaultURL}
}
return &nbroker{
addrs: cAddrs,
}
}