From c5fc37933f9a4a517f3a83e137ac6fcf6f298ce5 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 23 Jan 2019 01:39:42 +0300 Subject: [PATCH] initial import of stan - nats streaming broker Signed-off-by: Vasiliy Tolstov --- context.go | 37 +++++++ options.go | 42 ++++++++ stan.go | 279 +++++++++++++++++++++++++++++++++++++++++++++++++++ stan_test.go | 97 ++++++++++++++++++ 4 files changed, 455 insertions(+) create mode 100644 context.go create mode 100644 options.go create mode 100644 stan.go create mode 100644 stan_test.go diff --git a/context.go b/context.go new file mode 100644 index 0000000..8672894 --- /dev/null +++ b/context.go @@ -0,0 +1,37 @@ +package stan + +import ( + "context" + + "github.com/micro/go-micro/broker" +) + +// setSubscribeOption returns a function to setup a context with given value +func setSubscribeOption(k, v interface{}) broker.SubscribeOption { + return func(o *broker.SubscribeOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} + +// setBrokerOption returns a function to setup a context with given value +func setBrokerOption(k, v interface{}) broker.Option { + return func(o *broker.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} + +// setPublishOption returns a function to setup a context with given value +func setPublishOption(k, v interface{}) broker.PublishOption { + return func(o *broker.PublishOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, k, v) + } +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..174105f --- /dev/null +++ b/options.go @@ -0,0 +1,42 @@ +package stan + +import ( + "context" + + "github.com/micro/go-micro/broker" + stan "github.com/nats-io/go-nats-streaming" +) + +type optionsKey struct{} + +// Options accepts stan.Options +func Options(opts stan.Options) broker.Option { + return setBrokerOption(optionsKey{}, opts) +} + +type clusterIDKey struct{} + +// ClusterID specify cluster id to connect +func ClusterID(clusterID string) broker.Option { + return setBrokerOption(clusterIDKey{}, clusterID) +} + +type subscribeOptionKey struct{} + +func SubscribeOption(opts ...stan.SubscriptionOption) broker.SubscribeOption { + return setSubscribeOption(subscribeOptionKey{}, opts) +} + +type subscribeContextKey struct{} + +// SubscribeContext set the context for broker.SubscribeOption +func SubscribeContext(ctx context.Context) broker.SubscribeOption { + return setSubscribeOption(subscribeContextKey{}, ctx) +} + +type successAutoAckKey struct{} + +// SuccessAutoAck allow to AutoAck messages when handler returns without error +func SuccessAutoAck() broker.SubscribeOption { + return setSubscribeOption(successAutoAckKey{}, true) +} diff --git a/stan.go b/stan.go new file mode 100644 index 0000000..b15a4cc --- /dev/null +++ b/stan.go @@ -0,0 +1,279 @@ +// Package stan provides a NATS Streaming broker +package stan + +import ( + "context" + "errors" + "strings" + "sync" + + "github.com/google/uuid" + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec/json" + stan "github.com/nats-io/go-nats-streaming" +) + +type stanBroker struct { + sync.RWMutex + addrs []string + conn stan.Conn + opts broker.Options + nopts stan.Options +} + +type subscriber struct { + t string + s stan.Subscription + dq bool + opts broker.SubscribeOptions +} + +type publication struct { + t string + msg *stan.Msg + m *broker.Message +} + +func init() { + cmd.DefaultBrokers["stan"] = NewBroker +} + +func (n *publication) Topic() string { + return n.t +} + +func (n *publication) Message() *broker.Message { + return n.m +} + +func (n *publication) Ack() error { + return n.msg.Ack() +} + +func (n *subscriber) Options() broker.SubscribeOptions { + return n.opts +} + +func (n *subscriber) Topic() string { + return n.t +} + +func (n *subscriber) Unsubscribe() error { + if n.s == nil { + return nil + } + // go-micro server Unsubscribe can't handle durable queues, so close as stan suggested + // from nats streaming readme: + // When a client disconnects, the streaming server is not notified, hence the importance of calling Close() + if !n.dq { + err := n.s.Unsubscribe() + if err != nil { + return err + } + } + return n.Close() +} + +func (n *subscriber) Close() error { + if n.s != nil { + return n.s.Close() + } + return nil +} + +func (n *stanBroker) Address() string { + // stan does not support connected server info + if len(n.addrs) > 0 { + return n.addrs[0] + } + + return "" +} + +func setAddrs(addrs []string) []string { + var cAddrs []string + for _, addr := range addrs { + if len(addr) == 0 { + continue + } + if !strings.HasPrefix(addr, "nats://") { + addr = "nats://" + addr + } + cAddrs = append(cAddrs, addr) + } + if len(cAddrs) == 0 { + cAddrs = []string{stan.DefaultNatsURL} + } + return cAddrs +} + +func (n *stanBroker) Connect() error { + n.RLock() + if n.conn != nil { + n.RUnlock() + return nil + } + n.RUnlock() + + opts := n.nopts + opts.NatsURL = strings.Join(n.addrs, ",") + + clusterID, ok := n.opts.Context.Value(clusterIDKey{}).(string) + if !ok || len(clusterID) == 0 { + return errors.New("must specify ClusterID Option") + } + + clientID := uuid.New().String() + + nopts := []stan.Option{ + stan.NatsURL(opts.NatsURL), + stan.NatsConn(opts.NatsConn), + stan.ConnectWait(opts.ConnectTimeout), + stan.PubAckWait(opts.AckTimeout), + stan.MaxPubAcksInflight(opts.MaxPubAcksInflight), + stan.Pings(opts.PingIterval, opts.PingMaxOut), + stan.SetConnectionLostHandler(opts.ConnectionLostCB), + } + + c, err := stan.Connect(clusterID, clientID, nopts...) + if err != nil { + return err + } + n.Lock() + n.conn = c + n.Unlock() + return nil +} + +func (n *stanBroker) Disconnect() error { + n.RLock() + n.conn.Close() + n.RUnlock() + return nil +} + +func (n *stanBroker) Init(opts ...broker.Option) error { + for _, o := range opts { + o(&n.opts) + } + n.addrs = setAddrs(n.opts.Addrs) + return nil +} + +func (n *stanBroker) Options() broker.Options { + return n.opts +} + +func (n *stanBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + b, err := n.opts.Codec.Marshal(msg) + if err != nil { + return err + } + n.RLock() + defer n.RUnlock() + return n.conn.Publish(topic, b) +} + +func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + var successAutoAck bool + + opt := broker.SubscribeOptions{ + AutoAck: true, + } + + for _, o := range opts { + o(&opt) + } + + // Make sure context is setup + if opt.Context == nil { + opt.Context = context.Background() + } + + ctx := opt.Context + if subscribeContext, ok := ctx.Value(subscribeContextKey{}).(context.Context); ok && subscribeContext != nil { + ctx = subscribeContext + } + + var stanOpts []stan.SubscriptionOption + if opt.AutoAck { + stanOpts = append(stanOpts, stan.SetManualAckMode()) + } + + if subOpts, ok := ctx.Value(subscribeOptionKey{}).([]stan.SubscriptionOption); ok && len(subOpts) > 0 { + stanOpts = append(stanOpts, subOpts...) + } + + if bval, ok := ctx.Value(successAutoAckKey{}).(bool); ok && bval { + stanOpts = append(stanOpts, stan.SetManualAckMode()) + successAutoAck = true + } + + bopts := stan.DefaultSubscriptionOptions + for _, bopt := range stanOpts { + if err := bopt(&bopts); err != nil { + return nil, err + } + } + + fn := func(msg *stan.Msg) { + var m broker.Message + var err error + if err = n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { + return + } + err = handler(&publication{m: &m, msg: msg, t: msg.Subject}) + if err == nil && successAutoAck && bopts.ManualAcks { + msg.Ack() + } + } + + var sub stan.Subscription + var err error + + n.RLock() + if len(opt.Queue) > 0 { + sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn, stanOpts...) + } else { + sub, err = n.conn.Subscribe(topic, fn, stanOpts...) + } + n.RUnlock() + if err != nil { + return nil, err + } + return &subscriber{dq: len(bopts.DurableName) > 0, s: sub, opts: opt, t: topic}, nil +} + +func (n *stanBroker) String() string { + return "stan" +} + +func NewBroker(opts ...broker.Option) broker.Broker { + options := broker.Options{ + // Default codec + Codec: json.Marshaler{}, + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + stanOpts := stan.DefaultOptions + if n, ok := options.Context.Value(optionsKey{}).(stan.Options); ok { + stanOpts = n + } + + if len(options.Addrs) == 0 { + options.Addrs = strings.Split(stanOpts.NatsURL, ",") + } + + nb := &stanBroker{ + opts: options, + nopts: stanOpts, + addrs: setAddrs(options.Addrs), + } + + return nb +} diff --git a/stan_test.go b/stan_test.go new file mode 100644 index 0000000..c2b2b8e --- /dev/null +++ b/stan_test.go @@ -0,0 +1,97 @@ +package stan + +import ( + "fmt" + "strings" + "testing" + + "github.com/micro/go-micro/broker" + stan "github.com/nats-io/go-nats-streaming" +) + +var addrTestCases = []struct { + name string + description string + addrs map[string]string // expected address : set address +}{ + { + "brokerOpts", + "set broker addresses through a broker.Option in constructor", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "brokerInit", + "set broker addresses through a broker.Option in broker.Init()", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "natsOpts", + "set broker addresses through the snats.Option in constructor", + map[string]string{ + "nats://192.168.10.1:5222": "192.168.10.1:5222", + "nats://10.20.10.0:4222": "10.20.10.0:4222"}, + }, + { + "default", + "check if default Address is set correctly", + map[string]string{ + "nats://localhost:4222": ""}, + }, +} + +// TestInitAddrs tests issue #100. Ensures that if the addrs is set by an option in init it will be used. +func TestInitAddrs(t *testing.T) { + + for _, tc := range addrTestCases { + t.Run(fmt.Sprintf("%s: %s", tc.name, tc.description), func(t *testing.T) { + + var br broker.Broker + var addrs []string + + for _, addr := range tc.addrs { + addrs = append(addrs, addr) + } + + switch tc.name { + case "brokerOpts": + // we know that there are just two addrs in the dict + br = NewBroker(broker.Addrs(addrs[0], addrs[1])) + br.Init() + case "brokerInit": + br = NewBroker() + // we know that there are just two addrs in the dict + br.Init(broker.Addrs(addrs[0], addrs[1])) + case "natsOpts": + nopts := stan.DefaultOptions + nopts.NatsURL = strings.Join(addrs, ",") + br = NewBroker(Options(nopts)) + br.Init() + case "default": + br = NewBroker() + br.Init() + } + + stanBroker, ok := br.(*stanBroker) + if !ok { + t.Fatal("Expected broker to be of types *stanBroker") + } + // check if the same amount of addrs we set has actually been set + if len(stanBroker.addrs) != len(tc.addrs) { + t.Errorf("Expected Addr count = %d, Actual Addr count = %d", + len(stanBroker.addrs), len(tc.addrs)) + } + + for _, addr := range stanBroker.addrs { + _, ok := tc.addrs[addr] + if !ok { + t.Errorf("Expected '%s' has not been set", addr) + } + } + }) + + } +}