From 54a1eae3e24ebdf57ad0ed401aff514cc6fdd411 Mon Sep 17 00:00:00 2001 From: fztcjjl Date: Wed, 29 May 2019 21:57:48 +0800 Subject: [PATCH] durable subscriptions for nats streaming --- options.go | 7 +++++++ stan.go | 7 ++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/options.go b/options.go index 72278d7..a91ecc4 100644 --- a/options.go +++ b/options.go @@ -67,3 +67,10 @@ type connectRetryKey struct{} func ConnectRetry(v bool) broker.Option { return setBrokerOption(connectRetryKey{}, v) } + +type durableKey struct{} + +// DurableName sets the DurableName for the subscriber +func DurableName(name string) broker.Option { + return setBrokerOption(durableKey{}, name) +} diff --git a/stan.go b/stan.go index 58a1897..efe85c8 100644 --- a/stan.go +++ b/stan.go @@ -320,6 +320,11 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro opt.AutoAck = !bopts.ManualAcks + if dn, ok := n.opts.Context.Value(durableKey{}).(string); ok && len(dn) > 0 { + stanOpts = append(stanOpts, stan.DurableName(dn)) + bopts.DurableName = dn + } + fn := func(msg *stan.Msg) { var m broker.Message @@ -367,7 +372,7 @@ func NewBroker(opts ...broker.Option) broker.Broker { o(&options) } - stanOpts := stan.DefaultOptions + stanOpts := stan.GetDefaultOptions() if n, ok := options.Context.Value(optionsKey{}).(stan.Options); ok { stanOpts = n }