From 796dba3aac918125dacfc5898a3a336e5e45717d Mon Sep 17 00:00:00 2001 From: Asim Date: Thu, 28 Apr 2016 00:16:11 +0100 Subject: [PATCH] Fix up the http broker so it works concurrently and pubs async --- broker/http_broker.go | 24 ++++---- broker/http_broker_test.go | 112 ++++++++++++++++++++++++++++++++++++- examples/client/main.go | 6 +- 3 files changed, 126 insertions(+), 16 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index 293fa2f4..ea54aa8c 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -288,8 +288,11 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { id := req.Form.Get("id") h.RLock() + for _, subscriber := range h.subscribers[topic] { if id == subscriber.id { + // sub is sync; crufty rate limiting + // so we don't hose the cpu subscriber.fn(p) } } @@ -347,7 +350,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) return err } - fn := func(node *registry.Node, b io.Reader) { + fn := func(node *registry.Node, b []byte) { scheme := "http" // check if secure is added in metadata @@ -357,36 +360,31 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) vals := url.Values{} vals.Add("id", node.Id) + uri := fmt.Sprintf("%s://%s:%d%s?%s", scheme, node.Address, node.Port, DefaultSubPath, vals.Encode()) - r, err := h.c.Post(uri, "application/json", b) + r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) if err == nil { + io.Copy(ioutil.Discard, r.Body) r.Body.Close() } } - buf := bytes.NewBuffer(nil) - for _, service := range s { // broadcast version means broadcast to all nodes if service.Version == broadcastVersion { for _, node := range service.Nodes { - buf.Reset() - buf.Write(b) - fn(node, buf) + // publish async + go fn(node, b) } return nil } node := service.Nodes[rand.Int()%len(service.Nodes)] - buf.Reset() - buf.Write(b) - fn(node, buf) + // publish async + go fn(node, b) return nil } - buf.Reset() - buf = nil - return nil } diff --git a/broker/http_broker_test.go b/broker/http_broker_test.go index af71032b..d25783ea 100644 --- a/broker/http_broker_test.go +++ b/broker/http_broker_test.go @@ -1,6 +1,7 @@ package broker import ( + "sync" "testing" "github.com/micro/go-micro/registry/mock" @@ -29,7 +30,6 @@ func TestBroker(t *testing.T) { sub, err := b.Subscribe("test", func(p Publication) error { m := p.Message() - t.Logf("Received message %+v", m) if string(m.Body) != string(msg.Body) { t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) @@ -53,3 +53,113 @@ func TestBroker(t *testing.T) { t.Errorf("Unexpected disconnect error: %v", err) } } + +func TestConcurrentSubBroker(t *testing.T) { + m := mock.NewRegistry() + b := NewBroker(Registry(m)) + + if err := b.Init(); err != nil { + t.Errorf("Unexpected init error: %v", err) + } + + if err := b.Connect(); err != nil { + t.Errorf("Unexpected connect error: %v", err) + } + + msg := &Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + var subs []Subscriber + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + sub, err := b.Subscribe("test", func(p Publication) error { + defer wg.Done() + + m := p.Message() + + if string(m.Body) != string(msg.Body) { + t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) + } + + return nil + }) + if err != nil { + t.Errorf("Unexpected subscribe error: %v", err) + } + + wg.Add(1) + subs = append(subs, sub) + } + + if err := b.Publish("test", msg); err != nil { + t.Errorf("Unexpected publish error: %v", err) + } + + wg.Wait() + + for _, sub := range subs { + sub.Unsubscribe() + } + + if err := b.Disconnect(); err != nil { + t.Errorf("Unexpected disconnect error: %v", err) + } +} + +func TestConcurrentPubBroker(t *testing.T) { + m := mock.NewRegistry() + b := NewBroker(Registry(m)) + + if err := b.Init(); err != nil { + t.Errorf("Unexpected init error: %v", err) + } + + if err := b.Connect(); err != nil { + t.Errorf("Unexpected connect error: %v", err) + } + + msg := &Message{ + Header: map[string]string{ + "Content-Type": "application/json", + }, + Body: []byte(`{"message": "Hello World"}`), + } + + var wg sync.WaitGroup + + sub, err := b.Subscribe("test", func(p Publication) error { + defer wg.Done() + + m := p.Message() + + if string(m.Body) != string(msg.Body) { + t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) + } + + return nil + }) + if err != nil { + t.Errorf("Unexpected subscribe error: %v", err) + } + + for i := 0; i < 10; i++ { + wg.Add(1) + + if err := b.Publish("test", msg); err != nil { + t.Errorf("Unexpected publish error: %v", err) + } + } + + wg.Wait() + + sub.Unsubscribe() + + if err := b.Disconnect(); err != nil { + t.Errorf("Unexpected disconnect error: %v", err) + } +} diff --git a/examples/client/main.go b/examples/client/main.go index 3c86d94e..ba2704ae 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -125,6 +125,10 @@ func pingPong(i int) { func main() { cmd.Init() + + fmt.Println("\n--- Publisher example ---\n") + pub() + fmt.Println("\n--- Call example ---\n") for i := 0; i < 10; i++ { call(i) @@ -136,6 +140,4 @@ func main() { fmt.Println("\n--- Ping Pong example ---\n") pingPong(10) - fmt.Println("\n--- Publisher example ---\n") - pub() }