durable subscriptions for nats streaming

This commit is contained in:
fztcjjl 2019-05-29 21:57:48 +08:00 committed by Vasiliy Tolstov
parent de6f824e4a
commit 54a1eae3e2
2 changed files with 13 additions and 1 deletions

View File

@ -67,3 +67,10 @@ type connectRetryKey struct{}
func ConnectRetry(v bool) broker.Option { func ConnectRetry(v bool) broker.Option {
return setBrokerOption(connectRetryKey{}, v) return setBrokerOption(connectRetryKey{}, v)
} }
type durableKey struct{}
// DurableName sets the DurableName for the subscriber
func DurableName(name string) broker.Option {
return setBrokerOption(durableKey{}, name)
}

View File

@ -320,6 +320,11 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
opt.AutoAck = !bopts.ManualAcks opt.AutoAck = !bopts.ManualAcks
if dn, ok := n.opts.Context.Value(durableKey{}).(string); ok && len(dn) > 0 {
stanOpts = append(stanOpts, stan.DurableName(dn))
bopts.DurableName = dn
}
fn := func(msg *stan.Msg) { fn := func(msg *stan.Msg) {
var m broker.Message var m broker.Message
@ -367,7 +372,7 @@ func NewBroker(opts ...broker.Option) broker.Broker {
o(&options) o(&options)
} }
stanOpts := stan.DefaultOptions stanOpts := stan.GetDefaultOptions()
if n, ok := options.Context.Value(optionsKey{}).(stan.Options); ok { if n, ok := options.Context.Value(optionsKey{}).(stan.Options); ok {
stanOpts = n stanOpts = n
} }