micro/broker/memory_test.go

108 lines
2.2 KiB
Go
Raw Normal View History

package broker
import (
"context"
"fmt"
"testing"
"go.unistack.org/micro/v4/metadata"
)
func TestMemoryBatchBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()
if err := b.Connect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
topic := "test"
count := 10
fn := func(evts []Message) error {
var err error
for _, evt := range evts {
if err = evt.Ack(); err != nil {
return err
}
}
return nil
}
sub, err := b.Subscribe(ctx, topic, fn)
if err != nil {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]Message, 0, count)
for i := 0; i < count; i++ {
message := &memoryMessage{
header: metadata.Metadata{
metadata.HeaderTopic: []string{topic},
"foo": []string{"bar"},
"id": []string{fmt.Sprintf("%d", i)},
},
body: []byte(`"hello world"`),
}
msgs = append(msgs, message)
}
if err := b.Publish(ctx, msgs); err != nil {
t.Fatalf("Unexpected error publishing %v", err)
}
if err := sub.Unsubscribe(ctx); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}
if err := b.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
}
func TestMemoryBroker(t *testing.T) {
b := NewBroker()
ctx := context.Background()
if err := b.Connect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
topic := "test"
count := 10
fn := func(p Message) error {
return p.Ack()
}
sub, err := b.Subscribe(ctx, topic, fn)
if err != nil {
t.Fatalf("Unexpected error subscribing %v", err)
}
msgs := make([]Message, 0, count)
for i := 0; i < count; i++ {
message := &memoryMessage{
header: metadata.Metadata{
metadata.HeaderTopic: []string{topic},
"foo": []string{"bar"},
"id": []string{fmt.Sprintf("%d", i)},
},
body: []byte(`"hello world"`),
}
msgs = append(msgs, message)
}
if err := b.Publish(ctx, msgs); err != nil {
t.Fatalf("Unexpected error publishing %v", err)
}
if err := sub.Unsubscribe(ctx); err != nil {
t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err)
}
if err := b.Disconnect(ctx); err != nil {
t.Fatalf("Unexpected connect error %v", err)
}
}