Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
31
kgo_test.go
31
kgo_test.go
@@ -9,11 +9,11 @@ import (
|
||||
"time"
|
||||
|
||||
kg "github.com/twmb/franz-go/pkg/kgo"
|
||||
kgo "go.unistack.org/micro-broker-kgo/v3"
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/logger/slog"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
kgo "go.unistack.org/micro-broker-kgo/v4"
|
||||
"go.unistack.org/micro/v4/broker"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/logger/slog"
|
||||
"go.unistack.org/micro/v4/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -23,11 +23,6 @@ var (
|
||||
loglevel = logger.DebugLevel
|
||||
)
|
||||
|
||||
var bm = &broker.Message{
|
||||
Header: map[string]string{"hkey": "hval", metadata.HeaderTopic: "test"},
|
||||
Body: []byte(`"body"`),
|
||||
}
|
||||
|
||||
func TestFail(t *testing.T) {
|
||||
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
|
||||
t.Skip()
|
||||
@@ -72,9 +67,10 @@ func TestFail(t *testing.T) {
|
||||
}()
|
||||
|
||||
t.Logf("broker health %v", b.Health())
|
||||
msgs := make([]*broker.Message, 0, msgcnt)
|
||||
msgs := make([]broker.Message, 0, msgcnt)
|
||||
for i := int64(0); i < msgcnt; i++ {
|
||||
msgs = append(msgs, bm)
|
||||
m, _ := b.NewMessage(ctx, metadata.Pairs("hkey", "hval"), []byte(`test`))
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
@@ -86,7 +82,7 @@ func TestFail(t *testing.T) {
|
||||
// t.Skip()
|
||||
|
||||
idx := int64(0)
|
||||
fn := func(msg broker.Event) error {
|
||||
fn := func(msg broker.Message) error {
|
||||
atomic.AddInt64(&idx, 1)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
t.Logf("ack")
|
||||
@@ -170,19 +166,20 @@ func TestPubSub(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
if prefill {
|
||||
msgs := make([]*broker.Message, 0, msgcnt)
|
||||
msgs := make([]broker.Message, 0, msgcnt)
|
||||
for i := int64(0); i < msgcnt; i++ {
|
||||
msgs = append(msgs, bm)
|
||||
m, _ := b.NewMessage(ctx, metadata.Pairs("hkey", "hval"), []byte(`test`))
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
|
||||
if err := b.BatchPublish(ctx, msgs); err != nil {
|
||||
if err := b.Publish(ctx, "test", msgs...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// t.Skip()
|
||||
}
|
||||
done := make(chan bool, 1)
|
||||
idx := int64(0)
|
||||
fn := func(msg broker.Event) error {
|
||||
fn := func(msg broker.Message) error {
|
||||
atomic.AddInt64(&idx, 1)
|
||||
// time.Sleep(200 * time.Millisecond)
|
||||
return msg.Ack()
|
||||
|
Reference in New Issue
Block a user