why didn't we think of this before...single service name for http broker

This commit is contained in:
Asim Aslam 2019-11-30 21:00:36 +00:00
parent df7169c9f2
commit ce1942c578

View File

@ -64,6 +64,7 @@ type httpEvent struct {
var ( var (
DefaultSubPath = "/_sub" DefaultSubPath = "/_sub"
serviceName = "go.micro.http.broker"
broadcastVersion = "ff.http.broadcast" broadcastVersion = "ff.http.broadcast"
registerTTL = time.Minute registerTTL = time.Minute
registerInterval = time.Second * 30 registerInterval = time.Second * 30
@ -126,7 +127,7 @@ func newHttpBroker(opts ...Option) Broker {
} }
h := &httpBroker{ h := &httpBroker{
id: "go.micro.http.broker-" + uuid.New().String(), id: uuid.New().String(),
address: addr, address: addr,
opts: options, opts: options,
r: reg, r: reg,
@ -241,7 +242,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
// look for subscriber // look for subscriber
for _, sub := range h.subscribers[s.topic] { for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward // deregister and skip forward
if sub.id == s.id { if sub == s {
_ = h.r.Deregister(sub.svc) _ = h.r.Deregister(sub.svc)
continue continue
} }
@ -527,7 +528,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
// now attempt to get the service // now attempt to get the service
h.RLock() h.RLock()
s, err := h.r.GetService(topic) s, err := h.r.GetService(serviceName)
if err != nil { if err != nil {
h.RUnlock() h.RUnlock()
// ignore error // ignore error
@ -656,7 +657,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: h.id, Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port), Address: mnet.HostPort(addr, port),
Metadata: map[string]string{ Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure), "secure": fmt.Sprintf("%t", secure),
@ -672,7 +673,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
} }
service := &registry.Service{ service := &registry.Service{
Name: topic, Name: serviceName,
Version: version, Version: version,
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
@ -681,7 +682,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
subscriber := &httpSubscriber{ subscriber := &httpSubscriber{
opts: options, opts: options,
hb: h, hb: h,
id: h.id, id: node.Id,
topic: topic, topic: topic,
fn: handler, fn: handler,
svc: service, svc: service,