Merge pull request #364 from micro/inbox

Add inbox feature to http broker
This commit is contained in:
Asim Aslam 2019-01-03 11:27:46 +00:00 committed by GitHub
commit 461df8d464
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -45,6 +45,10 @@ type httpBroker struct {
subscribers map[string][]*httpSubscriber subscribers map[string][]*httpSubscriber
running bool running bool
exit chan chan error exit chan chan error
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte
} }
type httpSubscriber struct { type httpSubscriber struct {
@ -133,6 +137,7 @@ func newHttpBroker(opts ...Option) Broker {
subscribers: make(map[string][]*httpSubscriber), subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error), exit: make(chan chan error),
mux: http.NewServeMux(), mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
} }
// specify the message handler // specify the message handler
@ -175,6 +180,49 @@ func (h *httpSubscriber) Unsubscribe() error {
return h.hb.unsubscribe(h) return h.hb.unsubscribe(h)
} }
func (h *httpBroker) saveMessage(topic string, msg []byte) {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c := h.inbox[topic]
// save message
c = append(c, msg)
// max length 64
if len(c) > 64 {
c = c[:64]
}
// save inbox
h.inbox[topic] = c
}
func (h *httpBroker) getMessage(topic string, num int) [][]byte {
h.mtx.Lock()
defer h.mtx.Unlock()
// get messages
c, ok := h.inbox[topic]
if !ok {
return nil
}
// more message than requests
if len(c) >= num {
msg := c[:num]
h.inbox[topic] = c[num:]
return msg
}
// reset inbox
h.inbox[topic] = nil
// return all messages
return c
}
func (h *httpBroker) subscribe(s *httpSubscriber) error { func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock() h.Lock()
defer h.Unlock() defer h.Unlock()
@ -454,14 +502,7 @@ func (h *httpBroker) Options() Options {
} }
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
h.RLock() // create the message first
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
m := &Message{ m := &Message{
Header: make(map[string]string), Header: make(map[string]string),
Body: msg.Body, Body: msg.Body,
@ -473,12 +514,26 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
m.Header[":topic"] = topic m.Header[":topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m) b, err := h.opts.Codec.Marshal(m)
if err != nil { if err != nil {
return err return err
} }
pub := func(node *registry.Node, b []byte) { // save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
// ignore error
return nil
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http" scheme := "http"
// check if secure is added in metadata // check if secure is added in metadata
@ -491,12 +546,17 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err == nil { if err != nil {
io.Copy(ioutil.Discard, r.Body) return err
r.Body.Close()
}
} }
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s { for _, service := range s {
// only process if we have nodes // only process if we have nodes
if len(service.Nodes) == 0 { if len(service.Nodes) == 0 {
@ -506,18 +566,50 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
switch service.Version { switch service.Version {
// broadcast version means broadcast to all nodes // broadcast version means broadcast to all nodes
case broadcastVersion: case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range service.Nodes { for _, node := range service.Nodes {
// publish async // publish async
go pub(node, b) if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
} }
default: default:
// select node to publish to // select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)] node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async // publish async to one node
go pub(node, b) if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
} }
} }
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil return nil
} }