strip topic from http broker subscribe service name
This commit is contained in:
		| @@ -527,7 +527,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) | |||||||
|  |  | ||||||
| 	// now attempt to get the service | 	// now attempt to get the service | ||||||
| 	h.RLock() | 	h.RLock() | ||||||
| 	s, err := h.r.GetService("topic:" + topic) | 	s, err := h.r.GetService(topic) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		h.RUnlock() | 		h.RUnlock() | ||||||
| 		// ignore error | 		// ignore error | ||||||
| @@ -565,13 +565,29 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) | |||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | 			var nodes []*registry.Node | ||||||
|  |  | ||||||
|  | 			for _, node := range service.Nodes { | ||||||
|  | 				// only use nodes tagged with broker http | ||||||
|  | 				if node.Metadata["broker"] != "http" { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				// look for nodes for the topic | ||||||
|  | 				if node.Metadata["topic"] != topic { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				nodes = append(nodes, node) | ||||||
|  | 			} | ||||||
|  |  | ||||||
| 			switch service.Version { | 			switch service.Version { | ||||||
| 			// broadcast version means broadcast to all nodes | 			// broadcast version means broadcast to all nodes | ||||||
| 			case broadcastVersion: | 			case broadcastVersion: | ||||||
| 				var success bool | 				var success bool | ||||||
|  |  | ||||||
| 				// publish to all nodes | 				// publish to all nodes | ||||||
| 				for _, node := range service.Nodes { | 				for _, node := range nodes { | ||||||
| 					// publish async | 					// publish async | ||||||
| 					if err := pub(node, topic, b); err == nil { | 					if err := pub(node, topic, b); err == nil { | ||||||
| 						success = true | 						success = true | ||||||
| @@ -584,7 +600,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) | |||||||
| 				} | 				} | ||||||
| 			default: | 			default: | ||||||
| 				// select node to publish to | 				// select node to publish to | ||||||
| 				node := service.Nodes[rand.Int()%len(service.Nodes)] | 				node := nodes[rand.Int()%len(nodes)] | ||||||
|  |  | ||||||
| 				// publish async to one node | 				// publish async to one node | ||||||
| 				if err := pub(node, topic, b); err != nil { | 				if err := pub(node, topic, b); err != nil { | ||||||
| @@ -647,6 +663,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO | |||||||
| 		Address: mnet.HostPort(addr, port), | 		Address: mnet.HostPort(addr, port), | ||||||
| 		Metadata: map[string]string{ | 		Metadata: map[string]string{ | ||||||
| 			"secure": fmt.Sprintf("%t", secure), | 			"secure": fmt.Sprintf("%t", secure), | ||||||
|  | 			"broker": "http", | ||||||
|  | 			"topic":  topic, | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -657,7 +675,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	service := ®istry.Service{ | 	service := ®istry.Service{ | ||||||
| 		Name:    "topic:" + topic, | 		Name:    topic, | ||||||
| 		Version: version, | 		Version: version, | ||||||
| 		Nodes:   []*registry.Node{node}, | 		Nodes:   []*registry.Node{node}, | ||||||
| 	} | 	} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user