Fix http broker to allow multiple nodes for subscription, total hack

This commit is contained in:
Asim 2015-12-27 01:17:53 +00:00
parent 3dd911fb33
commit d0b2b612f3

View File

@ -260,15 +260,18 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
opt := newSubscribeOptions(opts...) opt := newSubscribeOptions(opts...)
fmt.Println("subscribe to", topic)
// parse address for host, port // parse address for host, port
parts := strings.Split(h.Address(), ":") parts := strings.Split(h.Address(), ":")
host := strings.Join(parts[:len(parts)-1], ":") host := strings.Join(parts[:len(parts)-1], ":")
port, _ := strconv.Atoi(parts[len(parts)-1]) port, _ := strconv.Atoi(parts[len(parts)-1])
id := uuid.NewUUID().String()
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: h.id, Id: topic + "." + h.id + "." + id,
Address: host, Address: host,
Port: port, Port: port,
} }
@ -286,7 +289,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
subscriber := &httpSubscriber{ subscriber := &httpSubscriber{
opts: opt, opts: opt,
id: uuid.NewUUID().String(), id: id,
topic: topic, topic: topic,
ch: h.unsubscribe, ch: h.unsubscribe,
fn: handler, fn: handler,
@ -299,8 +302,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
h.Lock() h.Lock()
h.subscribers[topic] = append(h.subscribers[topic], subscriber) h.subscribers[topic] = append(h.subscribers[topic], subscriber)
fmt.Println(h.subscribers)
h.Unlock() h.Unlock()
return subscriber, nil return subscriber, nil
} }