Support setting subscriber queue name

This commit is contained in:
Asim 2016-01-22 21:48:43 +00:00
parent 93ea171b31
commit 51d2ce2b4f
2 changed files with 13 additions and 1 deletions

View File

@ -37,6 +37,7 @@ type HandlerOptions struct {
} }
type SubscriberOptions struct { type SubscriberOptions struct {
Queue string
Internal bool Internal bool
} }
@ -56,3 +57,10 @@ func InternalSubscriber(b bool) SubscriberOption {
o.Internal = b o.Internal = b
} }
} }
// Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}

View File

@ -229,7 +229,11 @@ func (s *rpcServer) Register() error {
for sb, _ := range s.subscribers { for sb, _ := range s.subscribers {
handler := s.createSubHandler(sb, s.opts) handler := s.createSubHandler(sb, s.opts)
sub, err := config.Broker.Subscribe(sb.Topic(), handler) var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.QueueName(queue))
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil { if err != nil {
return err return err
} }