add AutoAck support for Server

This commit is contained in:
shu xian 2019-05-24 20:06:27 +08:00
parent 0a3df1b2f3
commit fac42bc1a9
3 changed files with 22 additions and 1 deletions

View File

@ -12,6 +12,9 @@ type HandlerOptions struct {
type SubscriberOption func(*SubscriberOptions) type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct { type SubscriberOptions struct {
// AutoAck defaults to true. When a handler returns
// with a nil error the message is acked.
AutoAck bool
Queue string Queue string
Internal bool Internal bool
Context context.Context Context context.Context
@ -43,6 +46,7 @@ func InternalSubscriber(b bool) SubscriberOption {
} }
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
opt := SubscriberOptions{ opt := SubscriberOptions{
AutoAck: true,
Context: context.Background(), Context: context.Background(),
} }
@ -53,6 +57,14 @@ func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
return opt return opt
} }
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = false
}
}
// Shared queue name distributed messages across subscribers // Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption { func SubscriberQueue(n string) SubscriberOption {
return func(o *SubscriberOptions) { return func(o *SubscriberOptions) {

View File

@ -365,9 +365,15 @@ func (s *rpcServer) Register() error {
if queue := sb.Options().Queue; len(queue) > 0 { if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue)) opts = append(opts, broker.Queue(queue))
} }
if cx := sb.Options().Context; cx != nil { if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx)) opts = append(opts, broker.SubscribeContext(cx))
} }
if !sb.Options().AutoAck {
opts = append(opts, broker.DisableAutoAck())
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil { if err != nil {
return err return err

View File

@ -34,7 +34,10 @@ type subscriber struct {
} }
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var options SubscriberOptions options := SubscriberOptions{
AutoAck: true,
}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }