diff --git a/broker/http_broker.go b/broker/http_broker.go index 901d4f4c..3e842b3c 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -533,7 +533,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) } h.RUnlock() - pub := func(node *registry.Node, t string, b []byte) { + pub := func(node *registry.Node, t string, b []byte) error { scheme := "http" // check if secure is added in metadata @@ -547,14 +547,13 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) if err != nil { - // save on error - h.saveMessage(t, b) - return + return err } // discard response body io.Copy(ioutil.Discard, r.Body) r.Body.Close() + return nil } srv := func(s []*registry.Service, b []byte) { @@ -567,16 +566,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) switch service.Version { // broadcast version means broadcast to all nodes case broadcastVersion: + var success bool + + // publish to all nodes for _, node := range service.Nodes { // publish async - pub(node, topic, b) + if err := pub(node, topic, b); err == nil { + success = true + } + } + + // save if it failed to publish at least once + if !success { + h.saveMessage(topic, b) } default: // select node to publish to node := service.Nodes[rand.Int()%len(service.Nodes)] - // publish async - pub(node, topic, b) + // publish async to one node + if err := pub(node, topic, b); err != nil { + // if failed save it + h.saveMessage(topic, b) + } } } }