From 51d2ce2b4f652634d1e6c0060f1e8a82362032d6 Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 22 Jan 2016 21:48:43 +0000 Subject: [PATCH] Support setting subscriber queue name --- server/handler.go | 8 ++++++++ server/rpc_server.go | 6 +++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/server/handler.go b/server/handler.go index a7ec3dc9..b944efeb 100644 --- a/server/handler.go +++ b/server/handler.go @@ -37,6 +37,7 @@ type HandlerOptions struct { } type SubscriberOptions struct { + Queue string Internal bool } @@ -56,3 +57,10 @@ func InternalSubscriber(b bool) SubscriberOption { o.Internal = b } } + +// Shared queue name distributed messages across subscribers +func SubscriberQueue(n string) SubscriberOption { + return func(o *SubscriberOptions) { + o.Queue = n + } +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 939b4b55..4f9ee371 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -229,7 +229,11 @@ func (s *rpcServer) Register() error { for sb, _ := range s.subscribers { handler := s.createSubHandler(sb, s.opts) - sub, err := config.Broker.Subscribe(sb.Topic(), handler) + var opts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.QueueName(queue)) + } + sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { return err }