diff --git a/stan.go b/stan.go index b15a4cc..be052b7 100644 --- a/stan.go +++ b/stan.go @@ -176,6 +176,9 @@ func (n *stanBroker) Publish(topic string, msg *broker.Message, opts ...broker.P } func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + if n.conn == nil { + return nil, errors.New("not connected") + } var successAutoAck bool opt := broker.SubscribeOptions{ @@ -197,7 +200,7 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro } var stanOpts []stan.SubscriptionOption - if opt.AutoAck { + if !opt.AutoAck { stanOpts = append(stanOpts, stan.SetManualAckMode()) } @@ -217,6 +220,8 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro } } + opt.AutoAck = !bopts.ManualAcks + fn := func(msg *stan.Msg) { var m broker.Message var err error @@ -224,7 +229,9 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro return } err = handler(&publication{m: &m, msg: msg, t: msg.Subject}) - if err == nil && successAutoAck && bopts.ManualAcks { + if err == nil && successAutoAck && !opt.AutoAck { + msg.Ack() + } else if err != nil && opt.AutoAck { msg.Ack() } }