From 6c04d1a29b2de0bd7e202fbdb853d0bc7ac78442 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 26 Jan 2019 13:37:58 +0300 Subject: [PATCH] propagate context and SuccessAutoAck option other brokers Signed-off-by: Vasiliy Tolstov --- stan.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/stan.go b/stan.go index b15a4cc..be052b7 100644 --- a/stan.go +++ b/stan.go @@ -176,6 +176,9 @@ func (n *stanBroker) Publish(topic string, msg *broker.Message, opts ...broker.P } func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + if n.conn == nil { + return nil, errors.New("not connected") + } var successAutoAck bool opt := broker.SubscribeOptions{ @@ -197,7 +200,7 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro } var stanOpts []stan.SubscriptionOption - if opt.AutoAck { + if !opt.AutoAck { stanOpts = append(stanOpts, stan.SetManualAckMode()) } @@ -217,6 +220,8 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro } } + opt.AutoAck = !bopts.ManualAcks + fn := func(msg *stan.Msg) { var m broker.Message var err error @@ -224,7 +229,9 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro return } err = handler(&publication{m: &m, msg: msg, t: msg.Subject}) - if err == nil && successAutoAck && bopts.ManualAcks { + if err == nil && successAutoAck && !opt.AutoAck { + msg.Ack() + } else if err != nil && opt.AutoAck { msg.Ack() } }