diff --git a/nats.go b/nats.go index b4da34c..539c248 100644 --- a/nats.go +++ b/nats.go @@ -19,11 +19,13 @@ type nbroker struct { conn *nats.Conn opts broker.Options nopts nats.Options + drain bool } type subscriber struct { - s *nats.Subscription - opts broker.SubscribeOptions + s *nats.Subscription + opts broker.SubscribeOptions + drain bool } type publication struct { @@ -56,6 +58,9 @@ func (n *subscriber) Topic() string { } func (n *subscriber) Unsubscribe() error { + if n.drain { + return n.s.Drain() + } return n.s.Unsubscribe() } @@ -121,7 +126,11 @@ func (n *nbroker) Connect() error { func (n *nbroker) Disconnect() error { n.RLock() - n.conn.Close() + if n.drain { + n.conn.Drain() + } else { + n.conn.Close() + } n.RUnlock() return nil } @@ -155,12 +164,18 @@ func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker opt := broker.SubscribeOptions{ AutoAck: true, + Context: context.Background(), } for _, o := range opts { o(&opt) } + var drain bool + if _, ok := opt.Context.Value(drainSubscriptionKey{}).(bool); ok { + drain = true + } + fn := func(msg *nats.Msg) { var m broker.Message if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { @@ -182,7 +197,7 @@ func (n *nbroker) Subscribe(topic string, handler broker.Handler, opts ...broker if err != nil { return nil, err } - return &subscriber{s: sub, opts: opt}, nil + return &subscriber{s: sub, opts: opt, drain: drain}, nil } func (n *nbroker) String() string { @@ -205,6 +220,11 @@ func NewBroker(opts ...broker.Option) broker.Broker { natsOpts = n } + var drain bool + if _, ok := options.Context.Value(drainSubscriptionKey{}).(bool); ok { + drain = true + } + // broker.Options have higher priority than nats.Options // only if Addrs, Secure or TLSConfig were not set through a broker.Option // we read them from nats.Option @@ -224,6 +244,7 @@ func NewBroker(opts ...broker.Option) broker.Broker { opts: options, nopts: natsOpts, addrs: setAddrs(options.Addrs), + drain: drain, } return nb diff --git a/options.go b/options.go index 96e2fb7..d942897 100644 --- a/options.go +++ b/options.go @@ -6,8 +6,20 @@ import ( ) type optionsKey struct{} +type drainConnectionKey struct{} +type drainSubscriptionKey struct{} // Options accepts nats.Options func Options(opts nats.Options) broker.Option { return setBrokerOption(optionsKey{}, opts) } + +// DrainConnection will drain subscription on close +func DrainConnection() broker.Option { + return setBrokerOption(drainConnectionKey{}, true) +} + +// DrainSubscription will drain pending messages when unsubscribe +func DrainSubscription() broker.SubscribeOption { + return setSubscribeOption(drainSubscriptionKey{}, true) +}