diff --git a/broker/nats/context.go b/broker/nats/context.go new file mode 100644 index 00000000..0f009e8b --- /dev/null +++ b/broker/nats/context.go @@ -0,0 +1,37 @@ +package nats + +import ( + "context" + + "github.com/micro/go-micro/broker" +) + +// setSubscribeOption returns a function to setup a context with given value +func setSubscribeOption(k, v interface{}) broker.SubscribeOption { + return func(o *broker.SubscribeOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} + +// setBrokerOption returns a function to setup a context with given value +func setBrokerOption(k, v interface{}) broker.Option { + return func(o *broker.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} + +// setPublishOption returns a function to setup a context with given value +func setPublishOption(k, v interface{}) broker.PublishOption { + return func(o *broker.PublishOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} diff --git a/broker/nats/nats.go b/broker/nats/nats.go new file mode 100644 index 00000000..027f1dac --- /dev/null +++ b/broker/nats/nats.go @@ -0,0 +1,245 @@ +// Package nats provides a NATS broker +package nats + +import ( + "context" + "errors" + "strings" + "sync" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec/json" + nats "github.com/nats-io/nats.go" +) + +type natsBroker struct { + sync.RWMutex + addrs []string + conn *nats.Conn + opts broker.Options + nopts nats.Options + drain bool +} + +type subscriber struct { + s *nats.Subscription + opts broker.SubscribeOptions + drain bool +} + +type publication struct { + t string + m *broker.Message +} + +func (p *publication) Topic() string { + return p.t +} + +func (p *publication) Message() *broker.Message { + return p.m +} + +func (p *publication) Ack() error { + // nats does not support acking + return nil +} + +func (s *subscriber) Options() broker.SubscribeOptions { + return s.opts +} + +func (s *subscriber) Topic() string { + return s.s.Subject +} + +func (s *subscriber) Unsubscribe() error { + if s.drain { + return s.s.Drain() + } + return s.s.Unsubscribe() +} + +func (n *natsBroker) Address() string { + if n.conn != nil && n.conn.IsConnected() { + return n.conn.ConnectedUrl() + } + if len(n.addrs) > 0 { + return n.addrs[0] + } + + return "" +} + +func setAddrs(addrs []string) []string { + var cAddrs []string + for _, addr := range addrs { + if len(addr) == 0 { + continue + } + if !strings.HasPrefix(addr, "nats://") { + addr = "nats://" + addr + } + cAddrs = append(cAddrs, addr) + } + if len(cAddrs) == 0 { + cAddrs = []string{nats.DefaultURL} + } + return cAddrs +} + +func (n *natsBroker) Connect() error { + n.Lock() + defer n.Unlock() + + status := nats.CLOSED + if n.conn != nil { + status = n.conn.Status() + } + + switch status { + case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING: + return nil + default: // DISCONNECTED or CLOSED or DRAINING + opts := n.nopts + opts.Servers = n.addrs + opts.Secure = n.opts.Secure + opts.TLSConfig = n.opts.TLSConfig + + // secure might not be set + if n.opts.TLSConfig != nil { + opts.Secure = true + } + + c, err := opts.Connect() + if err != nil { + return err + } + n.conn = c + return nil + } +} + +func (n *natsBroker) Disconnect() error { + n.RLock() + if n.drain { + n.conn.Drain() + } else { + n.conn.Close() + } + n.RUnlock() + return nil +} + +func (n *natsBroker) Init(opts ...broker.Option) error { + for _, o := range opts { + o(&n.opts) + } + n.addrs = setAddrs(n.opts.Addrs) + return nil +} + +func (n *natsBroker) Options() broker.Options { + return n.opts +} + +func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + b, err := n.opts.Codec.Marshal(msg) + if err != nil { + return err + } + n.RLock() + defer n.RUnlock() + return n.conn.Publish(topic, b) +} + +func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + if n.conn == nil { + return nil, errors.New("not connected") + } + + opt := broker.SubscribeOptions{ + AutoAck: true, + Context: context.Background(), + } + + for _, o := range opts { + o(&opt) + } + + var drain bool + if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok { + drain = true + } + + fn := func(msg *nats.Msg) { + var m broker.Message + if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { + return + } + handler(&publication{m: &m, t: msg.Subject}) + } + + var sub *nats.Subscription + var err error + + n.RLock() + if len(opt.Queue) > 0 { + sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn) + } else { + sub, err = n.conn.Subscribe(topic, fn) + } + n.RUnlock() + if err != nil { + return nil, err + } + return &subscriber{s: sub, opts: opt, drain: drain}, nil +} + +func (n *natsBroker) String() string { + return "nats" +} + +func NewBroker(opts ...broker.Option) broker.Broker { + options := broker.Options{ + // Default codec + Codec: json.Marshaler{}, + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + natsOpts := nats.GetDefaultOptions() + if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok { + natsOpts = n + } + + var drain bool + if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok { + drain = true + } + + // broker.Options have higher priority than nats.Options + // only if Addrs, Secure or TLSConfig were not set through a broker.Option + // we read them from nats.Option + if len(options.Addrs) == 0 { + options.Addrs = natsOpts.Servers + } + + if !options.Secure { + options.Secure = natsOpts.Secure + } + + if options.TLSConfig == nil { + options.TLSConfig = natsOpts.TLSConfig + } + + return &natsBroker{ + opts: options, + nopts: natsOpts, + addrs: setAddrs(options.Addrs), + drain: drain, + } +} diff --git a/broker/nats/nats_test.go b/broker/nats/nats_test.go new file mode 100644 index 00000000..36625da2 --- /dev/null +++ b/broker/nats/nats_test.go @@ -0,0 +1,99 @@ +package nats + +import ( + "fmt" + "testing" + + "github.com/micro/go-micro/broker" + nats "github.com/nats-io/nats.go" +) + +var addrTestCases = []struct { + name string + description string + addrs map[string]string // expected address : set address +}{ + { + "brokerOpts", + "set broker addresses through a broker.Option in constructor", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "brokerInit", + "set broker addresses through a broker.Option in broker.Init()", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "natsOpts", + "set broker addresses through the nats.Option in constructor", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "default", + "check if default Address is set correctly", + map[string]string{ + "nats://127.0.0.1:4222": "", + }, + }, +} + +// TestInitAddrs tests issue #100. Ensures that if the addrs is set by an option in init it will be used. +func TestInitAddrs(t *testing.T) { + + for _, tc := range addrTestCases { + t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) { + + var br broker.Broker + var addrs []string + + for _, addr := range tc.addrs { + addrs = append(addrs, addr) + } + + switch tc.name { + case "brokerOpts": + // we know that there are just two addrs in the dict + br = NewBroker(broker.Addrs(addrs[0], addrs[1])) + br.Init() + case "brokerInit": + br = NewBroker() + // we know that there are just two addrs in the dict + br.Init(broker.Addrs(addrs[0], addrs[1])) + case "natsOpts": + nopts := nats.GetDefaultOptions() + nopts.Servers = addrs + br = NewBroker(Options(nopts)) + br.Init() + case "default": + br = NewBroker() + br.Init() + } + + natsBroker, ok := br.(*natsBroker) + if !ok { + t.Fatal("Expected broker to be of types *natsBroker") + } + // check if the same amount of addrs we set has actually been set, default + // have only 1 address nats://127.0.0.1:4222 (current nats code) or + // nats://localhost:4222 (older code version) + if len(natsBroker.addrs) != len(tc.addrs) && tc.name != "default" { + t.Errorf("Expected Addr count = %d, Actual Addr count = %d", + len(natsBroker.addrs), len(tc.addrs)) + } + + for _, addr := range natsBroker.addrs { + _, ok := tc.addrs[addr] + if !ok { + t.Errorf("Expected '%s' has not been set", addr) + } + } + }) + + } +} diff --git a/broker/nats/options.go b/broker/nats/options.go new file mode 100644 index 00000000..47431606 --- /dev/null +++ b/broker/nats/options.go @@ -0,0 +1,25 @@ +package nats + +import ( + "github.com/micro/go-micro/broker" + nats "github.com/nats-io/nats.go" +) + +type optionsKey struct{} +type drainConnectionKey struct{} +type drainSubscriptionKey struct{} + +// Options accepts nats.Options +func Options(opts nats.Options) broker.Option { + return setBrokerOption(optionsKey{}, opts) +} + +// DrainConnection will drain subscription on close +func DrainConnection() broker.Option { + return setBrokerOption(drainConnectionKey{}, true) +} + +// DrainSubscription will drain pending messages when unsubscribe +func DrainSubscription() broker.SubscribeOption { + return setSubscribeOption(drainSubscriptionKey{}, true) +} diff --git a/cmd/cmd.go b/cmd/cmd.go index 9a9a34ef..6d4a4a71 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -18,6 +18,7 @@ import ( "github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker/http" "github.com/micro/go-micro/broker/memory" + "github.com/micro/go-micro/broker/nats" // registries "github.com/micro/go-micro/registry" @@ -170,6 +171,7 @@ var ( DefaultBrokers = map[string]func(...broker.Option) broker.Broker{ "http": http.NewBroker, "memory": memory.NewBroker, + "nats": nats.NewBroker, } DefaultClients = map[string]func(...client.Option) client.Client{