server: subscribe to topic with own name if router not nil (#1295)

* server: subscribe to topic with own name if router not nil

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-03-05 10:29:50 +03:00 committed by GitHub
parent 67c26c71b6
commit ce2ba71002
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -630,6 +630,8 @@ func (s *rpcServer) Register() error {
// set what we're advertising // set what we're advertising
s.opts.Advertise = addr s.opts.Advertise = addr
// router can exchange messages
if s.opts.Router != nil {
// subscribe to the topic with own name // subscribe to the topic with own name
sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent) sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent)
if err != nil { if err != nil {
@ -638,6 +640,7 @@ func (s *rpcServer) Register() error {
// save the subscriber // save the subscriber
s.subscriber = sub s.subscriber = sub
}
// subscribe for all of the subscribers // subscribe for all of the subscribers
for sb := range s.subscribers { for sb := range s.subscribers {
@ -654,11 +657,11 @@ func (s *rpcServer) Register() error {
opts = append(opts, broker.DisableAutoAck()) opts = append(opts, broker.DisableAutoAck())
} }
log.Infof("Subscribing to topic: %s", sub.Topic())
sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...)
if err != nil { if err != nil {
return err return err
} }
log.Infof("Subscribing to topic: %s", sub.Topic())
s.subscribers[sb] = []broker.Subscriber{sub} s.subscribers[sb] = []broker.Subscriber{sub}
} }