Fix up the http broker so it works concurrently and pubs async
This commit is contained in:
		| @@ -288,8 +288,11 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||||||
| 	id := req.Form.Get("id") | 	id := req.Form.Get("id") | ||||||
|  |  | ||||||
| 	h.RLock() | 	h.RLock() | ||||||
|  |  | ||||||
| 	for _, subscriber := range h.subscribers[topic] { | 	for _, subscriber := range h.subscribers[topic] { | ||||||
| 		if id == subscriber.id { | 		if id == subscriber.id { | ||||||
|  | 			// sub is sync; crufty rate limiting | ||||||
|  | 			// so we don't hose the cpu | ||||||
| 			subscriber.fn(p) | 			subscriber.fn(p) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -347,7 +350,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fn := func(node *registry.Node, b io.Reader) { | 	fn := func(node *registry.Node, b []byte) { | ||||||
| 		scheme := "http" | 		scheme := "http" | ||||||
|  |  | ||||||
| 		// check if secure is added in metadata | 		// check if secure is added in metadata | ||||||
| @@ -357,36 +360,31 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) | |||||||
|  |  | ||||||
| 		vals := url.Values{} | 		vals := url.Values{} | ||||||
| 		vals.Add("id", node.Id) | 		vals.Add("id", node.Id) | ||||||
|  |  | ||||||
| 		uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) | 		uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) | ||||||
| 		r, err := h.c.Post(uri, "application/json", b) | 		r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
|  | 			io.Copy(ioutil.Discard, r.Body) | ||||||
| 			r.Body.Close() | 			r.Body.Close() | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	buf := bytes.NewBuffer(nil) |  | ||||||
|  |  | ||||||
| 	for _, service := range s { | 	for _, service := range s { | ||||||
| 		// broadcast version means broadcast to all nodes | 		// broadcast version means broadcast to all nodes | ||||||
| 		if service.Version == broadcastVersion { | 		if service.Version == broadcastVersion { | ||||||
| 			for _, node := range service.Nodes { | 			for _, node := range service.Nodes { | ||||||
| 				buf.Reset() | 				// publish async | ||||||
| 				buf.Write(b) | 				go fn(node, b) | ||||||
| 				fn(node, buf) |  | ||||||
| 			} | 			} | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		node := service.Nodes[rand.Int()%len(service.Nodes)] | 		node := service.Nodes[rand.Int()%len(service.Nodes)] | ||||||
| 		buf.Reset() | 		// publish async | ||||||
| 		buf.Write(b) | 		go fn(node, b) | ||||||
| 		fn(node, buf) |  | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	buf.Reset() |  | ||||||
| 	buf = nil |  | ||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,6 +1,7 @@ | |||||||
| package broker | package broker | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"sync" | ||||||
| 	"testing" | 	"testing" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/registry/mock" | 	"github.com/micro/go-micro/registry/mock" | ||||||
| @@ -29,7 +30,6 @@ func TestBroker(t *testing.T) { | |||||||
|  |  | ||||||
| 	sub, err := b.Subscribe("test", func(p Publication) error { | 	sub, err := b.Subscribe("test", func(p Publication) error { | ||||||
| 		m := p.Message() | 		m := p.Message() | ||||||
| 		t.Logf("Received message %+v", m) |  | ||||||
|  |  | ||||||
| 		if string(m.Body) != string(msg.Body) { | 		if string(m.Body) != string(msg.Body) { | ||||||
| 			t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) | 			t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) | ||||||
| @@ -53,3 +53,113 @@ func TestBroker(t *testing.T) { | |||||||
| 		t.Errorf("Unexpected disconnect error: %v", err) | 		t.Errorf("Unexpected disconnect error: %v", err) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestConcurrentSubBroker(t *testing.T) { | ||||||
|  | 	m := mock.NewRegistry() | ||||||
|  | 	b := NewBroker(Registry(m)) | ||||||
|  |  | ||||||
|  | 	if err := b.Init(); err != nil { | ||||||
|  | 		t.Errorf("Unexpected init error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := b.Connect(); err != nil { | ||||||
|  | 		t.Errorf("Unexpected connect error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	msg := &Message{ | ||||||
|  | 		Header: map[string]string{ | ||||||
|  | 			"Content-Type": "application/json", | ||||||
|  | 		}, | ||||||
|  | 		Body: []byte(`{"message": "Hello World"}`), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var subs []Subscriber | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  |  | ||||||
|  | 	for i := 0; i < 10; i++ { | ||||||
|  | 		sub, err := b.Subscribe("test", func(p Publication) error { | ||||||
|  | 			defer wg.Done() | ||||||
|  |  | ||||||
|  | 			m := p.Message() | ||||||
|  |  | ||||||
|  | 			if string(m.Body) != string(msg.Body) { | ||||||
|  | 				t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			return nil | ||||||
|  | 		}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			t.Errorf("Unexpected subscribe error: %v", err) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		wg.Add(1) | ||||||
|  | 		subs = append(subs, sub) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := b.Publish("test", msg); err != nil { | ||||||
|  | 		t.Errorf("Unexpected publish error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	wg.Wait() | ||||||
|  |  | ||||||
|  | 	for _, sub := range subs { | ||||||
|  | 		sub.Unsubscribe() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := b.Disconnect(); err != nil { | ||||||
|  | 		t.Errorf("Unexpected disconnect error: %v", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestConcurrentPubBroker(t *testing.T) { | ||||||
|  | 	m := mock.NewRegistry() | ||||||
|  | 	b := NewBroker(Registry(m)) | ||||||
|  |  | ||||||
|  | 	if err := b.Init(); err != nil { | ||||||
|  | 		t.Errorf("Unexpected init error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := b.Connect(); err != nil { | ||||||
|  | 		t.Errorf("Unexpected connect error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	msg := &Message{ | ||||||
|  | 		Header: map[string]string{ | ||||||
|  | 			"Content-Type": "application/json", | ||||||
|  | 		}, | ||||||
|  | 		Body: []byte(`{"message": "Hello World"}`), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  |  | ||||||
|  | 	sub, err := b.Subscribe("test", func(p Publication) error { | ||||||
|  | 		defer wg.Done() | ||||||
|  |  | ||||||
|  | 		m := p.Message() | ||||||
|  |  | ||||||
|  | 		if string(m.Body) != string(msg.Body) { | ||||||
|  | 			t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Errorf("Unexpected subscribe error: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for i := 0; i < 10; i++ { | ||||||
|  | 		wg.Add(1) | ||||||
|  |  | ||||||
|  | 		if err := b.Publish("test", msg); err != nil { | ||||||
|  | 			t.Errorf("Unexpected publish error: %v", err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	wg.Wait() | ||||||
|  |  | ||||||
|  | 	sub.Unsubscribe() | ||||||
|  |  | ||||||
|  | 	if err := b.Disconnect(); err != nil { | ||||||
|  | 		t.Errorf("Unexpected disconnect error: %v", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -125,6 +125,10 @@ func pingPong(i int) { | |||||||
|  |  | ||||||
| func main() { | func main() { | ||||||
| 	cmd.Init() | 	cmd.Init() | ||||||
|  |  | ||||||
|  | 	fmt.Println("\n--- Publisher example ---\n") | ||||||
|  | 	pub() | ||||||
|  |  | ||||||
| 	fmt.Println("\n--- Call example ---\n") | 	fmt.Println("\n--- Call example ---\n") | ||||||
| 	for i := 0; i < 10; i++ { | 	for i := 0; i < 10; i++ { | ||||||
| 		call(i) | 		call(i) | ||||||
| @@ -136,6 +140,4 @@ func main() { | |||||||
| 	fmt.Println("\n--- Ping Pong example ---\n") | 	fmt.Println("\n--- Ping Pong example ---\n") | ||||||
| 	pingPong(10) | 	pingPong(10) | ||||||
|  |  | ||||||
| 	fmt.Println("\n--- Publisher example ---\n") |  | ||||||
| 	pub() |  | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user