propagate context and SuccessAutoAck option other brokers

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2019-01-26 13:37:58 +03:00
parent c5fc37933f
commit 6c04d1a29b

11
stan.go
View File

@ -176,6 +176,9 @@ func (n *stanBroker) Publish(topic string, msg *broker.Message, opts ...broker.P
}
func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
if n.conn == nil {
return nil, errors.New("not connected")
}
var successAutoAck bool
opt := broker.SubscribeOptions{
@ -197,7 +200,7 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
}
var stanOpts []stan.SubscriptionOption
if opt.AutoAck {
if !opt.AutoAck {
stanOpts = append(stanOpts, stan.SetManualAckMode())
}
@ -217,6 +220,8 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
}
}
opt.AutoAck = !bopts.ManualAcks
fn := func(msg *stan.Msg) {
var m broker.Message
var err error
@ -224,7 +229,9 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
return
}
err = handler(&publication{m: &m, msg: msg, t: msg.Subject})
if err == nil && successAutoAck && bopts.ManualAcks {
if err == nil && successAutoAck && !opt.AutoAck {
msg.Ack()
} else if err != nil && opt.AutoAck {
msg.Ack()
}
}