diff --git a/server/handler.go b/server/handler.go index a7ec3dc9..b944efeb 100644 --- a/server/handler.go +++ b/server/handler.go @@ -37,6 +37,7 @@ type HandlerOptions struct { } type SubscriberOptions struct { + Queue string Internal bool } @@ -56,3 +57,10 @@ func InternalSubscriber(b bool) SubscriberOption { o.Internal = b } } + +// Shared queue name distributed messages across subscribers +func SubscriberQueue(n string) SubscriberOption { + return func(o *SubscriberOptions) { + o.Queue = n + } +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 939b4b55..4f9ee371 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -229,7 +229,11 @@ func (s *rpcServer) Register() error { for sb, _ := range s.subscribers { handler := s.createSubHandler(sb, s.opts) - sub, err := config.Broker.Subscribe(sb.Topic(), handler) + var opts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.QueueName(queue)) + } + sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { return err }