diff --git a/server/handler.go b/server/handler.go index ffa0865d..8236c355 100644 --- a/server/handler.go +++ b/server/handler.go @@ -12,6 +12,9 @@ type HandlerOptions struct { type SubscriberOption func(*SubscriberOptions) type SubscriberOptions struct { + // AutoAck defaults to true. When a handler returns + // with a nil error the message is acked. + AutoAck bool Queue string Internal bool Context context.Context @@ -43,6 +46,7 @@ func InternalSubscriber(b bool) SubscriberOption { } func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { opt := SubscriberOptions{ + AutoAck: true, Context: context.Background(), } @@ -53,6 +57,14 @@ func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { return opt } +// DisableAutoAck will disable auto acking of messages +// after they have been handled. +func DisableAutoAck() SubscriberOption { + return func(o *SubscriberOptions) { + o.AutoAck = false + } +} + // Shared queue name distributed messages across subscribers func SubscriberQueue(n string) SubscriberOption { return func(o *SubscriberOptions) { diff --git a/server/rpc_server.go b/server/rpc_server.go index 4f1049c1..b4924846 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -365,9 +365,15 @@ func (s *rpcServer) Register() error { if queue := sb.Options().Queue; len(queue) > 0 { opts = append(opts, broker.Queue(queue)) } + if cx := sb.Options().Context; cx != nil { opts = append(opts, broker.SubscribeContext(cx)) } + + if !sb.Options().AutoAck { + opts = append(opts, broker.DisableAutoAck()) + } + sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { return err diff --git a/server/subscriber.go b/server/subscriber.go index a02bc222..b9a50a7b 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -34,7 +34,10 @@ type subscriber struct { } func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { - var options SubscriberOptions + options := SubscriberOptions{ + AutoAck: true, + } + for _, o := range opts { o(&options) }