add http broker benchmark so we can test codec changes

This commit is contained in:
Asim 2016-06-25 03:15:56 +01:00
parent 0348d0eed9
commit 1cdf98418f

View File

@ -3,20 +3,151 @@ package broker
import ( import (
"sync" "sync"
"testing" "testing"
"time"
"github.com/micro/go-micro/registry/mock" "github.com/micro/go-micro/registry/mock"
"github.com/pborman/uuid"
) )
func sub(be *testing.B, c int) {
be.StopTimer()
m := mock.NewRegistry()
b := NewBroker(Registry(m))
topic := uuid.NewUUID().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)
}
if err := b.Connect(); err != nil {
be.Fatalf("Unexpected connect error: %v", err)
}
msg := &Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
var subs []Subscriber
done := make(chan bool, c)
for i := 0; i < c; i++ {
sub, err := b.Subscribe(topic, func(p Publication) error {
done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
}
return nil
}, Queue("shared"))
if err != nil {
be.Fatalf("Unexpected subscribe error: %v", err)
}
subs = append(subs, sub)
}
for i := 0; i < be.N; i++ {
be.StartTimer()
if err := b.Publish(topic, msg); err != nil {
be.Fatalf("Unexpected publish error: %v", err)
}
<-done
be.StopTimer()
}
for _, sub := range subs {
sub.Unsubscribe()
}
if err := b.Disconnect(); err != nil {
be.Fatalf("Unexpected disconnect error: %v", err)
}
}
func pub(be *testing.B, c int) {
be.StopTimer()
m := mock.NewRegistry()
b := NewBroker(Registry(m))
topic := uuid.NewUUID().String()
if err := b.Init(); err != nil {
be.Fatalf("Unexpected init error: %v", err)
}
if err := b.Connect(); err != nil {
be.Fatalf("Unexpected connect error: %v", err)
}
msg := &Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
done := make(chan bool, c*4)
sub, err := b.Subscribe(topic, func(p Publication) error {
done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
}
return nil
}, Queue("shared"))
if err != nil {
be.Fatalf("Unexpected subscribe error: %v", err)
}
var wg sync.WaitGroup
ch := make(chan int, c*4)
be.StartTimer()
for i := 0; i < c; i++ {
go func() {
for _ = range ch {
if err := b.Publish(topic, msg); err != nil {
be.Fatalf("Unexpected publish error: %v", err)
}
select {
case <-done:
case <-time.After(time.Second):
}
wg.Done()
}
}()
}
for i := 0; i < be.N; i++ {
wg.Add(1)
ch <- i
}
wg.Wait()
be.StopTimer()
sub.Unsubscribe()
close(ch)
close(done)
if err := b.Disconnect(); err != nil {
be.Fatalf("Unexpected disconnect error: %v", err)
}
}
func TestBroker(t *testing.T) { func TestBroker(t *testing.T) {
m := mock.NewRegistry() m := mock.NewRegistry()
b := NewBroker(Registry(m)) b := NewBroker(Registry(m))
if err := b.Init(); err != nil { if err := b.Init(); err != nil {
t.Errorf("Unexpected init error: %v", err) t.Fatalf("Unexpected init error: %v", err)
} }
if err := b.Connect(); err != nil { if err := b.Connect(); err != nil {
t.Errorf("Unexpected connect error: %v", err) t.Fatalf("Unexpected connect error: %v", err)
} }
msg := &Message{ msg := &Message{
@ -32,25 +163,25 @@ func TestBroker(t *testing.T) {
m := p.Message() m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
} }
close(done) close(done)
return nil return nil
}) })
if err != nil { if err != nil {
t.Errorf("Unexpected subscribe error: %v", err) t.Fatalf("Unexpected subscribe error: %v", err)
} }
if err := b.Publish("test", msg); err != nil { if err := b.Publish("test", msg); err != nil {
t.Errorf("Unexpected publish error: %v", err) t.Fatalf("Unexpected publish error: %v", err)
} }
<-done <-done
sub.Unsubscribe() sub.Unsubscribe()
if err := b.Disconnect(); err != nil { if err := b.Disconnect(); err != nil {
t.Errorf("Unexpected disconnect error: %v", err) t.Fatalf("Unexpected disconnect error: %v", err)
} }
} }
@ -59,11 +190,11 @@ func TestConcurrentSubBroker(t *testing.T) {
b := NewBroker(Registry(m)) b := NewBroker(Registry(m))
if err := b.Init(); err != nil { if err := b.Init(); err != nil {
t.Errorf("Unexpected init error: %v", err) t.Fatalf("Unexpected init error: %v", err)
} }
if err := b.Connect(); err != nil { if err := b.Connect(); err != nil {
t.Errorf("Unexpected connect error: %v", err) t.Fatalf("Unexpected connect error: %v", err)
} }
msg := &Message{ msg := &Message{
@ -83,13 +214,13 @@ func TestConcurrentSubBroker(t *testing.T) {
m := p.Message() m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
} }
return nil return nil
}) })
if err != nil { if err != nil {
t.Errorf("Unexpected subscribe error: %v", err) t.Fatalf("Unexpected subscribe error: %v", err)
} }
wg.Add(1) wg.Add(1)
@ -97,7 +228,7 @@ func TestConcurrentSubBroker(t *testing.T) {
} }
if err := b.Publish("test", msg); err != nil { if err := b.Publish("test", msg); err != nil {
t.Errorf("Unexpected publish error: %v", err) t.Fatalf("Unexpected publish error: %v", err)
} }
wg.Wait() wg.Wait()
@ -107,7 +238,7 @@ func TestConcurrentSubBroker(t *testing.T) {
} }
if err := b.Disconnect(); err != nil { if err := b.Disconnect(); err != nil {
t.Errorf("Unexpected disconnect error: %v", err) t.Fatalf("Unexpected disconnect error: %v", err)
} }
} }
@ -116,11 +247,11 @@ func TestConcurrentPubBroker(t *testing.T) {
b := NewBroker(Registry(m)) b := NewBroker(Registry(m))
if err := b.Init(); err != nil { if err := b.Init(); err != nil {
t.Errorf("Unexpected init error: %v", err) t.Fatalf("Unexpected init error: %v", err)
} }
if err := b.Connect(); err != nil { if err := b.Connect(); err != nil {
t.Errorf("Unexpected connect error: %v", err) t.Fatalf("Unexpected connect error: %v", err)
} }
msg := &Message{ msg := &Message{
@ -138,20 +269,20 @@ func TestConcurrentPubBroker(t *testing.T) {
m := p.Message() m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
t.Errorf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
} }
return nil return nil
}) })
if err != nil { if err != nil {
t.Errorf("Unexpected subscribe error: %v", err) t.Fatalf("Unexpected subscribe error: %v", err)
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
wg.Add(1) wg.Add(1)
if err := b.Publish("test", msg); err != nil { if err := b.Publish("test", msg); err != nil {
t.Errorf("Unexpected publish error: %v", err) t.Fatalf("Unexpected publish error: %v", err)
} }
} }
@ -160,6 +291,45 @@ func TestConcurrentPubBroker(t *testing.T) {
sub.Unsubscribe() sub.Unsubscribe()
if err := b.Disconnect(); err != nil { if err := b.Disconnect(); err != nil {
t.Errorf("Unexpected disconnect error: %v", err) t.Fatalf("Unexpected disconnect error: %v", err)
} }
} }
func BenchmarkSub1(b *testing.B) {
sub(b, 1)
}
func BenchmarkSub8(b *testing.B) {
sub(b, 8)
}
func BenchmarkSub32(b *testing.B) {
sub(b, 32)
}
func BenchmarkSub64(b *testing.B) {
sub(b, 64)
}
func BenchmarkSub128(b *testing.B) {
sub(b, 128)
}
func BenchmarkPub1(b *testing.B) {
pub(b, 1)
}
func BenchmarkPub8(b *testing.B) {
pub(b, 8)
}
func BenchmarkPub32(b *testing.B) {
pub(b, 32)
}
func BenchmarkPub64(b *testing.B) {
pub(b, 64)
}
func BenchmarkPub128(b *testing.B) {
pub(b, 128)
}