diff --git a/broker/http_broker.go b/broker/http_broker.go index 385c7fa0..b6c78b62 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -64,6 +64,7 @@ type httpEvent struct { var ( DefaultSubPath = "/_sub" + serviceName = "go.micro.http.broker" broadcastVersion = "ff.http.broadcast" registerTTL = time.Minute registerInterval = time.Second * 30 @@ -126,7 +127,7 @@ func newHttpBroker(opts ...Option) Broker { } h := &httpBroker{ - id: "go.micro.http.broker-" + uuid.New().String(), + id: uuid.New().String(), address: addr, opts: options, r: reg, @@ -241,7 +242,7 @@ func (h *httpBroker) unsubscribe(s *httpSubscriber) error { // look for subscriber for _, sub := range h.subscribers[s.topic] { // deregister and skip forward - if sub.id == s.id { + if sub == s { _ = h.r.Deregister(sub.svc) continue } @@ -527,7 +528,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) // now attempt to get the service h.RLock() - s, err := h.r.GetService(topic) + s, err := h.r.GetService(serviceName) if err != nil { h.RUnlock() // ignore error @@ -656,7 +657,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO // register service node := ®istry.Node{ - Id: h.id, + Id: topic + "-" + h.id, Address: mnet.HostPort(addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), @@ -672,7 +673,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO } service := ®istry.Service{ - Name: topic, + Name: serviceName, Version: version, Nodes: []*registry.Node{node}, } @@ -681,7 +682,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO subscriber := &httpSubscriber{ opts: options, hb: h, - id: h.id, + id: node.Id, topic: topic, fn: handler, svc: service,