From 8873e6ad088236822f078cb81a1702ea33bf7d1f Mon Sep 17 00:00:00 2001 From: Asim Date: Mon, 21 Nov 2016 15:58:39 +0100 Subject: [PATCH] we need to not return after processing broadcast, still gotta process the other queues --- broker/http_broker.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index eca52d51..09b8aa0a 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -375,18 +375,26 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) } for _, service := range s { + // only process if we have nodes + if len(service.Nodes) == 0 { + continue + } + + switch service.Version { // broadcast version means broadcast to all nodes - if service.Version == broadcastVersion { + case broadcastVersion: for _, node := range service.Nodes { // publish async go fn(node, b) } - return nil - } - node := service.Nodes[rand.Int()%len(service.Nodes)] - // publish async - go fn(node, b) + default: + // select node to publish to + node := service.Nodes[rand.Int()%len(service.Nodes)] + + // publish async + go fn(node, b) + } } return nil