Merge pull request #864 from micro/strip-topic

strip topic from http broker subscribe service name
This commit is contained in:
Asim Aslam 2019-10-17 18:48:46 +01:00 committed by GitHub
commit 63fd8b9d1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -527,7 +527,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
// now attempt to get the service // now attempt to get the service
h.RLock() h.RLock()
s, err := h.r.GetService("topic:" + topic) s, err := h.r.GetService(topic)
if err != nil { if err != nil {
h.RUnlock() h.RUnlock()
// ignore error // ignore error
@ -565,13 +565,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
continue continue
} }
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
switch service.Version { switch service.Version {
// broadcast version means broadcast to all nodes // broadcast version means broadcast to all nodes
case broadcastVersion: case broadcastVersion:
var success bool var success bool
// publish to all nodes // publish to all nodes
for _, node := range service.Nodes { for _, node := range nodes {
// publish async // publish async
if err := pub(node, topic, b); err == nil { if err := pub(node, topic, b); err == nil {
success = true success = true
@ -584,7 +600,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
} }
default: default:
// select node to publish to // select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)] node := nodes[rand.Int()%len(nodes)]
// publish async to one node // publish async to one node
if err := pub(node, topic, b); err != nil { if err := pub(node, topic, b); err != nil {
@ -647,6 +663,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
Address: mnet.HostPort(addr, port), Address: mnet.HostPort(addr, port),
Metadata: map[string]string{ Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure), "secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
}, },
} }
@ -657,7 +675,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
} }
service := &registry.Service{ service := &registry.Service{
Name: "topic:" + topic, Name: topic,
Version: version, Version: version,
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }