Merge pull request #364 from micro/inbox
Add inbox feature to http broker
This commit is contained in:
		| @@ -45,6 +45,10 @@ type httpBroker struct { | |||||||
| 	subscribers map[string][]*httpSubscriber | 	subscribers map[string][]*httpSubscriber | ||||||
| 	running     bool | 	running     bool | ||||||
| 	exit        chan chan error | 	exit        chan chan error | ||||||
|  |  | ||||||
|  | 	// offline message inbox | ||||||
|  | 	mtx   sync.RWMutex | ||||||
|  | 	inbox map[string][][]byte | ||||||
| } | } | ||||||
|  |  | ||||||
| type httpSubscriber struct { | type httpSubscriber struct { | ||||||
| @@ -133,6 +137,7 @@ func newHttpBroker(opts ...Option) Broker { | |||||||
| 		subscribers: make(map[string][]*httpSubscriber), | 		subscribers: make(map[string][]*httpSubscriber), | ||||||
| 		exit:        make(chan chan error), | 		exit:        make(chan chan error), | ||||||
| 		mux:         http.NewServeMux(), | 		mux:         http.NewServeMux(), | ||||||
|  | 		inbox:       make(map[string][][]byte), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// specify the message handler | 	// specify the message handler | ||||||
| @@ -175,6 +180,49 @@ func (h *httpSubscriber) Unsubscribe() error { | |||||||
| 	return h.hb.unsubscribe(h) | 	return h.hb.unsubscribe(h) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (h *httpBroker) saveMessage(topic string, msg []byte) { | ||||||
|  | 	h.mtx.Lock() | ||||||
|  | 	defer h.mtx.Unlock() | ||||||
|  |  | ||||||
|  | 	// get messages | ||||||
|  | 	c := h.inbox[topic] | ||||||
|  |  | ||||||
|  | 	// save message | ||||||
|  | 	c = append(c, msg) | ||||||
|  |  | ||||||
|  | 	// max length 64 | ||||||
|  | 	if len(c) > 64 { | ||||||
|  | 		c = c[:64] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// save inbox | ||||||
|  | 	h.inbox[topic] = c | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (h *httpBroker) getMessage(topic string, num int) [][]byte { | ||||||
|  | 	h.mtx.Lock() | ||||||
|  | 	defer h.mtx.Unlock() | ||||||
|  |  | ||||||
|  | 	// get messages | ||||||
|  | 	c, ok := h.inbox[topic] | ||||||
|  | 	if !ok { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// more message than requests | ||||||
|  | 	if len(c) >= num { | ||||||
|  | 		msg := c[:num] | ||||||
|  | 		h.inbox[topic] = c[num:] | ||||||
|  | 		return msg | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// reset inbox | ||||||
|  | 	h.inbox[topic] = nil | ||||||
|  |  | ||||||
|  | 	// return all messages | ||||||
|  | 	return c | ||||||
|  | } | ||||||
|  |  | ||||||
| func (h *httpBroker) subscribe(s *httpSubscriber) error { | func (h *httpBroker) subscribe(s *httpSubscriber) error { | ||||||
| 	h.Lock() | 	h.Lock() | ||||||
| 	defer h.Unlock() | 	defer h.Unlock() | ||||||
| @@ -454,14 +502,7 @@ func (h *httpBroker) Options() Options { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { | func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { | ||||||
| 	h.RLock() | 	// create the message first | ||||||
| 	s, err := h.r.GetService("topic:" + topic) |  | ||||||
| 	if err != nil { |  | ||||||
| 		h.RUnlock() |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	h.RUnlock() |  | ||||||
|  |  | ||||||
| 	m := &Message{ | 	m := &Message{ | ||||||
| 		Header: make(map[string]string), | 		Header: make(map[string]string), | ||||||
| 		Body:   msg.Body, | 		Body:   msg.Body, | ||||||
| @@ -473,12 +514,26 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) | |||||||
|  |  | ||||||
| 	m.Header[":topic"] = topic | 	m.Header[":topic"] = topic | ||||||
|  |  | ||||||
|  | 	// encode the message | ||||||
| 	b, err := h.opts.Codec.Marshal(m) | 	b, err := h.opts.Codec.Marshal(m) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	pub := func(node *registry.Node, b []byte) { | 	// save the message | ||||||
|  | 	h.saveMessage(topic, b) | ||||||
|  |  | ||||||
|  | 	// now attempt to get the service | ||||||
|  | 	h.RLock() | ||||||
|  | 	s, err := h.r.GetService("topic:" + topic) | ||||||
|  | 	if err != nil { | ||||||
|  | 		h.RUnlock() | ||||||
|  | 		// ignore error | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	h.RUnlock() | ||||||
|  |  | ||||||
|  | 	pub := func(node *registry.Node, t string, b []byte) error { | ||||||
| 		scheme := "http" | 		scheme := "http" | ||||||
|  |  | ||||||
| 		// check if secure is added in metadata | 		// check if secure is added in metadata | ||||||
| @@ -491,34 +546,71 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) | |||||||
|  |  | ||||||
| 		uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) | 		uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) | ||||||
| 		r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) | 		r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) | ||||||
| 		if err == nil { | 		if err != nil { | ||||||
| 			io.Copy(ioutil.Discard, r.Body) | 			return err | ||||||
| 			r.Body.Close() |  | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		// discard response body | ||||||
|  | 		io.Copy(ioutil.Discard, r.Body) | ||||||
|  | 		r.Body.Close() | ||||||
|  | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, service := range s { | 	srv := func(s []*registry.Service, b []byte) { | ||||||
| 		// only process if we have nodes | 		for _, service := range s { | ||||||
| 		if len(service.Nodes) == 0 { | 			// only process if we have nodes | ||||||
| 			continue | 			if len(service.Nodes) == 0 { | ||||||
| 		} | 				continue | ||||||
|  |  | ||||||
| 		switch service.Version { |  | ||||||
| 		// broadcast version means broadcast to all nodes |  | ||||||
| 		case broadcastVersion: |  | ||||||
| 			for _, node := range service.Nodes { |  | ||||||
| 				// publish async |  | ||||||
| 				go pub(node, b) |  | ||||||
| 			} | 			} | ||||||
| 		default: |  | ||||||
| 			// select node to publish to |  | ||||||
| 			node := service.Nodes[rand.Int()%len(service.Nodes)] |  | ||||||
|  |  | ||||||
| 			// publish async | 			switch service.Version { | ||||||
| 			go pub(node, b) | 			// broadcast version means broadcast to all nodes | ||||||
|  | 			case broadcastVersion: | ||||||
|  | 				var success bool | ||||||
|  |  | ||||||
|  | 				// publish to all nodes | ||||||
|  | 				for _, node := range service.Nodes { | ||||||
|  | 					// publish async | ||||||
|  | 					if err := pub(node, topic, b); err == nil { | ||||||
|  | 						success = true | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				// save if it failed to publish at least once | ||||||
|  | 				if !success { | ||||||
|  | 					h.saveMessage(topic, b) | ||||||
|  | 				} | ||||||
|  | 			default: | ||||||
|  | 				// select node to publish to | ||||||
|  | 				node := service.Nodes[rand.Int()%len(service.Nodes)] | ||||||
|  |  | ||||||
|  | 				// publish async to one node | ||||||
|  | 				if err := pub(node, topic, b); err != nil { | ||||||
|  | 					// if failed save it | ||||||
|  | 					h.saveMessage(topic, b) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// do the rest async | ||||||
|  | 	go func() { | ||||||
|  | 		// get a third of the backlog | ||||||
|  | 		messages := h.getMessage(topic, 8) | ||||||
|  | 		delay := (len(messages) > 1) | ||||||
|  |  | ||||||
|  | 		// publish all the messages | ||||||
|  | 		for _, msg := range messages { | ||||||
|  | 			// serialize here | ||||||
|  | 			srv(s, msg) | ||||||
|  |  | ||||||
|  | 			// sending a backlog of messages | ||||||
|  | 			if delay { | ||||||
|  | 				time.Sleep(time.Millisecond * 100) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user