From abbeb6d0688645a1cf2bd7899023225d096f102d Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 2 Jan 2019 19:27:46 +0000 Subject: [PATCH] add inbox feature to http broker --- broker/http_broker.go | 138 +++++++++++++++++++++++++++++++++--------- 1 file changed, 109 insertions(+), 29 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index a761a197..901d4f4c 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -45,6 +45,10 @@ type httpBroker struct { subscribers map[string][]*httpSubscriber running bool exit chan chan error + + // offline message inbox + mtx sync.RWMutex + inbox map[string][][]byte } type httpSubscriber struct { @@ -133,6 +137,7 @@ func newHttpBroker(opts ...Option) Broker { subscribers: make(map[string][]*httpSubscriber), exit: make(chan chan error), mux: http.NewServeMux(), + inbox: make(map[string][][]byte), } // specify the message handler @@ -175,6 +180,49 @@ func (h *httpSubscriber) Unsubscribe() error { return h.hb.unsubscribe(h) } +func (h *httpBroker) saveMessage(topic string, msg []byte) { + h.mtx.Lock() + defer h.mtx.Unlock() + + // get messages + c := h.inbox[topic] + + // save message + c = append(c, msg) + + // max length 64 + if len(c) > 64 { + c = c[:64] + } + + // save inbox + h.inbox[topic] = c +} + +func (h *httpBroker) getMessage(topic string, num int) [][]byte { + h.mtx.Lock() + defer h.mtx.Unlock() + + // get messages + c, ok := h.inbox[topic] + if !ok { + return nil + } + + // more message than requests + if len(c) >= num { + msg := c[:num] + h.inbox[topic] = c[num:] + return msg + } + + // reset inbox + h.inbox[topic] = nil + + // return all messages + return c +} + func (h *httpBroker) subscribe(s *httpSubscriber) error { h.Lock() defer h.Unlock() @@ -454,14 +502,7 @@ func (h *httpBroker) Options() Options { } func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { - h.RLock() - s, err := h.r.GetService("topic:" + topic) - if err != nil { - h.RUnlock() - return err - } - h.RUnlock() - + // create the message first m := &Message{ Header: make(map[string]string), Body: msg.Body, @@ -473,12 +514,26 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) m.Header[":topic"] = topic + // encode the message b, err := h.opts.Codec.Marshal(m) if err != nil { return err } - pub := func(node *registry.Node, b []byte) { + // save the message + h.saveMessage(topic, b) + + // now attempt to get the service + h.RLock() + s, err := h.r.GetService("topic:" + topic) + if err != nil { + h.RUnlock() + // ignore error + return nil + } + h.RUnlock() + + pub := func(node *registry.Node, t string, b []byte) { scheme := "http" // check if secure is added in metadata @@ -491,34 +546,59 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) - if err == nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() + if err != nil { + // save on error + h.saveMessage(t, b) + return } + + // discard response body + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() } - for _, service := range s { - // only process if we have nodes - if len(service.Nodes) == 0 { - continue - } - - switch service.Version { - // broadcast version means broadcast to all nodes - case broadcastVersion: - for _, node := range service.Nodes { - // publish async - go pub(node, b) + srv := func(s []*registry.Service, b []byte) { + for _, service := range s { + // only process if we have nodes + if len(service.Nodes) == 0 { + continue } - default: - // select node to publish to - node := service.Nodes[rand.Int()%len(service.Nodes)] - // publish async - go pub(node, b) + switch service.Version { + // broadcast version means broadcast to all nodes + case broadcastVersion: + for _, node := range service.Nodes { + // publish async + pub(node, topic, b) + } + default: + // select node to publish to + node := service.Nodes[rand.Int()%len(service.Nodes)] + + // publish async + pub(node, topic, b) + } } } + // do the rest async + go func() { + // get a third of the backlog + messages := h.getMessage(topic, 8) + delay := (len(messages) > 1) + + // publish all the messages + for _, msg := range messages { + // serialize here + srv(s, msg) + + // sending a backlog of messages + if delay { + time.Sleep(time.Millisecond * 100) + } + } + }() + return nil }