From 3aedea4c56a6ea15927bdb580f4b6fced767181a Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 17 Oct 2019 18:37:37 +0100 Subject: [PATCH] strip topic from http broker subscribe service name --- broker/http_broker.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index 4a22fe12..c2ae4dda 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -527,7 +527,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) // now attempt to get the service h.RLock() - s, err := h.r.GetService("topic:" + topic) + s, err := h.r.GetService(topic) if err != nil { h.RUnlock() // ignore error @@ -565,13 +565,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) continue } + var nodes []*registry.Node + + for _, node := range service.Nodes { + // only use nodes tagged with broker http + if node.Metadata["broker"] != "http" { + continue + } + + // look for nodes for the topic + if node.Metadata["topic"] != topic { + continue + } + + nodes = append(nodes, node) + } + switch service.Version { // broadcast version means broadcast to all nodes case broadcastVersion: var success bool // publish to all nodes - for _, node := range service.Nodes { + for _, node := range nodes { // publish async if err := pub(node, topic, b); err == nil { success = true @@ -584,7 +600,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) } default: // select node to publish to - node := service.Nodes[rand.Int()%len(service.Nodes)] + node := nodes[rand.Int()%len(nodes)] // publish async to one node if err := pub(node, topic, b); err != nil { @@ -647,6 +663,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO Address: mnet.HostPort(addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), + "broker": "http", + "topic": topic, }, } @@ -657,7 +675,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO } service := ®istry.Service{ - Name: "topic:" + topic, + Name: topic, Version: version, Nodes: []*registry.Node{node}, }