diff --git a/context.go b/context.go deleted file mode 100644 index 8869ea7..0000000 --- a/context.go +++ /dev/null @@ -1,48 +0,0 @@ -package nats - -import ( - "context" - - "github.com/micro/go-micro/v2/broker" - "github.com/micro/go-micro/v2/server" -) - -// setSubscribeOption returns a function to setup a context with given value -func setSubscribeOption(k, v interface{}) broker.SubscribeOption { - return func(o *broker.SubscribeOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - -// setBrokerOption returns a function to setup a context with given value -func setBrokerOption(k, v interface{}) broker.Option { - return func(o *broker.Options) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - -// setBrokerOption returns a function to setup a context with given value -func setServerSubscriberOption(k, v interface{}) server.SubscriberOption { - return func(o *server.SubscriberOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} - -// setPublishOption returns a function to setup a context with given value -func setPublishOption(k, v interface{}) broker.PublishOption { - return func(o *broker.PublishOptions) { - if o.Context == nil { - o.Context = context.Background() - } - o.Context = context.WithValue(o.Context, k, v) - } -} diff --git a/go.mod b/go.mod index 4c4ff37..7ac0501 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/micro/go-plugins/broker/nats/v2 go 1.13 require ( - github.com/micro/go-micro/v2 v2.1.2 + github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d github.com/nats-io/nats.go v1.9.1 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect ) diff --git a/go.sum b/go.sum index 317534e..9c982e7 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ github.com/micro/cli/v2 v2.1.2 h1:43J1lChg/rZCC1rvdqZNFSQDrGT7qfMrtp6/ztpIkEM= github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg= github.com/micro/go-micro/v2 v2.1.2 h1:r2pu9OckjG+vHD1Ttpwbsj9UnYAHnEiYa3ND1ejL6Do= github.com/micro/go-micro/v2 v2.1.2/go.mod h1:6RewFTFMI5H5CbuQymu4eS0cFtqYsdGFruMflWT36IQ= +github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d h1:Rz+SOJiYuj4mWO3Z1qj2iTt5njWFjxR8Jv7g0LkDwVc= +github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d/go.mod h1:JxIKgdCqe9hhdUOAyd2uWaCpRdSn9dWq8wnwlo8qodk= github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE= github.com/micro/mdns v0.3.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc= github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -319,6 +321,7 @@ github.com/oracle/oci-go-sdk v7.0.0+incompatible/go.mod h1:VQb79nF8Z2cwLkLS35ukw github.com/ovh/go-ovh v0.0.0-20181109152953-ba5adb4cf014/go.mod h1:joRatxRJaZBsY3JAOEMcoOp05CnZzsx4scTxi95DHyQ= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/nats.go b/nats.go index 443f844..36172aa 100644 --- a/nats.go +++ b/nats.go @@ -1,251 +1,16 @@ -// Package nats provides a NATS broker -package nats +// Package memory provides a memory broker +package memory import ( - "context" - "errors" - "strings" - "sync" - "github.com/micro/go-micro/v2/broker" - "github.com/micro/go-micro/v2/codec/json" + "github.com/micro/go-micro/v2/broker/nats" "github.com/micro/go-micro/v2/config/cmd" - nats "github.com/nats-io/nats.go" ) -type nbroker struct { - sync.RWMutex - addrs []string - conn *nats.Conn - opts broker.Options - nopts nats.Options - drain bool -} - -type subscriber struct { - s *nats.Subscription - opts broker.SubscribeOptions - drain bool -} - -type publication struct { - t string - m *broker.Message -} - func init() { cmd.DefaultBrokers["nats"] = NewBroker } -func (n *publication) Topic() string { - return n.t -} - -func (n *publication) Message() *broker.Message { - return n.m -} - -func (n *publication) Ack() error { - return nil -} - -func (n *subscriber) Options() broker.SubscribeOptions { - return n.opts -} - -func (n *subscriber) Topic() string { - return n.s.Subject -} - -func (n *subscriber) Unsubscribe() error { - if n.drain { - return n.s.Drain() - } - return n.s.Unsubscribe() -} - -func (n *nbroker) Address() string { - if n.conn != nil && n.conn.IsConnected() { - return n.conn.ConnectedUrl() - } - if len(n.addrs) > 0 { - return n.addrs[0] - } - - return "" -} - -func setAddrs(addrs []string) []string { - var cAddrs []string - for _, addr := range addrs { - if len(addr) == 0 { - continue - } - if !strings.HasPrefix(addr, "nats://") { - addr = "nats://" + addr - } - cAddrs = append(cAddrs, addr) - } - if len(cAddrs) == 0 { - cAddrs = []string{nats.DefaultURL} - } - return cAddrs -} - -func (n *nbroker) Connect() error { - n.Lock() - defer n.Unlock() - - status := nats.CLOSED - if n.conn != nil { - status = n.conn.Status() - } - - switch status { - case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING: - return nil - default: // DISCONNECTED or CLOSED or DRAINING - opts := n.nopts - opts.Servers = n.addrs - opts.Secure = n.opts.Secure - opts.TLSConfig = n.opts.TLSConfig - - // secure might not be set - if n.opts.TLSConfig != nil { - opts.Secure = true - } - - c, err := opts.Connect() - if err != nil { - return err - } - n.conn = c - return nil - } -} - -func (n *nbroker) Disconnect() error { - n.RLock() - if n.drain { - n.conn.Drain() - } else { - n.conn.Close() - } - n.RUnlock() - return nil -} - -func (n *nbroker) Init(opts ...broker.Option) error { - for _, o := range opts { - o(&n.opts) - } - n.addrs = setAddrs(n.opts.Addrs) - return nil -} - -func (n *nbroker) Options() broker.Options { - return n.opts -} - -func (n *nbroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { - b, err := n.opts.Codec.Marshal(msg) - if err != nil { - return err - } - n.RLock() - defer n.RUnlock() - return n.conn.Publish(topic, b) -} - -func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - if n.conn == nil { - return nil, errors.New("not connected") - } - - opt := broker.SubscribeOptions{ - AutoAck: true, - Context: context.Background(), - } - - for _, o := range opts { - o(&opt) - } - - var drain bool - if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok { - drain = true - } - - fn := func(msg *nats.Msg) { - var m broker.Message - if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { - return - } - handler(&publication{m: &m, t: msg.Subject}) - } - - var sub *nats.Subscription - var err error - - n.RLock() - if len(opt.Queue) > 0 { - sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn) - } else { - sub, err = n.conn.Subscribe(topic, fn) - } - n.RUnlock() - if err != nil { - return nil, err - } - return &subscriber{s: sub, opts: opt, drain: drain}, nil -} - -func (n *nbroker) String() string { - return "nats" -} - func NewBroker(opts ...broker.Option) broker.Broker { - options := broker.Options{ - // Default codec - Codec: json.Marshaler{}, - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - natsOpts := nats.GetDefaultOptions() - if n, ok := options.Context.Value(optionsKey{}).(nats.Options); ok { - natsOpts = n - } - - var drain bool - if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok { - drain = true - } - - // broker.Options have higher priority than nats.Options - // only if Addrs, Secure or TLSConfig were not set through a broker.Option - // we read them from nats.Option - if len(options.Addrs) == 0 { - options.Addrs = natsOpts.Servers - } - - if !options.Secure { - options.Secure = natsOpts.Secure - } - - if options.TLSConfig == nil { - options.TLSConfig = natsOpts.TLSConfig - } - - nb := &nbroker{ - opts: options, - nopts: natsOpts, - addrs: setAddrs(options.Addrs), - drain: drain, - } - - return nb + return nats.NewBroker(opts...) } diff --git a/nats_test.go b/nats_test.go deleted file mode 100644 index 978ee17..0000000 --- a/nats_test.go +++ /dev/null @@ -1,99 +0,0 @@ -package nats - -import ( - "fmt" - "testing" - - "github.com/micro/go-micro/v2/broker" - nats "github.com/nats-io/nats.go" -) - -var addrTestCases = []struct { - name string - description string - addrs map[string]string // expected address : set address -}{ - { - "brokerOpts", - "set broker addresses through a broker.Option in constructor", - map[string]string{ - "nats://192.168.10.1:5222": "192.168.10.1:5222", - "nats://10.20.10.0:4222": "10.20.10.0:4222"}, - }, - { - "brokerInit", - "set broker addresses through a broker.Option in broker.Init()", - map[string]string{ - "nats://192.168.10.1:5222": "192.168.10.1:5222", - "nats://10.20.10.0:4222": "10.20.10.0:4222"}, - }, - { - "natsOpts", - "set broker addresses through the nats.Option in constructor", - map[string]string{ - "nats://192.168.10.1:5222": "192.168.10.1:5222", - "nats://10.20.10.0:4222": "10.20.10.0:4222"}, - }, - { - "default", - "check if default Address is set correctly", - map[string]string{ - "nats://127.0.0.1:4222": "", - }, - }, -} - -// TestInitAddrs tests issue #100. Ensures that if the addrs is set by an option in init it will be used. -func TestInitAddrs(t *testing.T) { - - for _, tc := range addrTestCases { - t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) { - - var br broker.Broker - var addrs []string - - for _, addr := range tc.addrs { - addrs = append(addrs, addr) - } - - switch tc.name { - case "brokerOpts": - // we know that there are just two addrs in the dict - br = NewBroker(broker.Addrs(addrs[0], addrs[1])) - br.Init() - case "brokerInit": - br = NewBroker() - // we know that there are just two addrs in the dict - br.Init(broker.Addrs(addrs[0], addrs[1])) - case "natsOpts": - nopts := nats.GetDefaultOptions() - nopts.Servers = addrs - br = NewBroker(Options(nopts)) - br.Init() - case "default": - br = NewBroker() - br.Init() - } - - natsBroker, ok := br.(*nbroker) - if !ok { - t.Fatal("Expected broker to be of types *nbroker") - } - // check if the same amount of addrs we set has actually been set, default - // have only 1 address nats://127.0.0.1:4222 (current nats code) or - // nats://localhost:4222 (older code version) - if len(natsBroker.addrs) != len(tc.addrs) && tc.name != "default" { - t.Errorf("Expected Addr count = %d, Actual Addr count = %d", - len(natsBroker.addrs), len(tc.addrs)) - } - - for _, addr := range natsBroker.addrs { - _, ok := tc.addrs[addr] - if !ok { - t.Errorf("Expected '%s' has not been set", addr) - } - } - }) - - } -} diff --git a/options.go b/options.go deleted file mode 100644 index dcbe816..0000000 --- a/options.go +++ /dev/null @@ -1,25 +0,0 @@ -package nats - -import ( - "github.com/micro/go-micro/v2/broker" - nats "github.com/nats-io/nats.go" -) - -type optionsKey struct{} -type drainConnectionKey struct{} -type drainSubscriptionKey struct{} - -// Options accepts nats.Options -func Options(opts nats.Options) broker.Option { - return setBrokerOption(optionsKey{}, opts) -} - -// DrainConnection will drain subscription on close -func DrainConnection() broker.Option { - return setBrokerOption(drainConnectionKey{}, true) -} - -// DrainSubscription will drain pending messages when unsubscribe -func DrainSubscription() broker.SubscribeOption { - return setSubscribeOption(drainSubscriptionKey{}, true) -}