better error handling

This commit is contained in:
Asim Aslam 2019-01-03 11:23:06 +00:00
parent abbeb6d068
commit 7c2cbe2ad2

View File

@ -533,7 +533,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) {
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
@ -547,14 +547,13 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
// save on error
h.saveMessage(t, b)
return
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
@ -567,16 +566,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range service.Nodes {
// publish async
pub(node, topic, b)
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
pub(node, topic, b)
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}