update all

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-07-31 15:17:18 +03:00
parent f1db855826
commit 03fe6da356
8 changed files with 230 additions and 475 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
@@ -36,6 +37,157 @@ var (
}
)
func TestConsumerGroup(t *testing.T) {
topic := fmt.Sprintf("test_topic")
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
t.Skip()
}
logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel))
ctx := context.Background()
var addrs []string
if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
addrs = []string{"127.0.0.1:9092"}
} else {
addrs = strings.Split(addr, ",")
}
meter.DefaultMeter = victoriameter.NewMeter()
s := https.NewServer(server.Context(ctx), server.Address("127.0.0.1:0"), server.Codec("text/plain", codec.NewCodec()))
if err := s.Init(); err != nil {
t.Fatal(err)
}
if err := meterhandler.RegisterMeterServer(s, meterhandler.NewHandler()); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil {
t.Fatal(err)
}
defer func() {
if err := s.Stop(); err != nil {
t.Fatal(err)
}
}()
segmentio.DefaultWriterConfig.Async = true
segmentio.DefaultWriterConfig.BatchTimeout = 1 * time.Second
segmentio.DefaultWriterConfig.RequiredAcks = int(kafka.RequireAll)
segmentio.DefaultReaderConfig.StartOffset = kafka.FirstOffset
segmentio.DefaultReaderConfig.MinBytes = 1024 * 10 // 10 kb
segmentio.DefaultReaderConfig.MaxBytes = 1024 * 1024 * 20 // 20 Mb
segmentio.DefaultReaderConfig.MaxWait = 20 * time.Second // 20s
segmentio.DefaultReaderConfig.QueueCapacity = 500
segmentio.DefaultReaderConfig.ReadBackoffMin = 2 * time.Second
segmentio.DefaultReaderConfig.ReadBackoffMax = 5 * time.Second
segmentio.DefaultReaderConfig.Logger = &lg{}
segmentio.DefaultReaderConfig.CommitInterval = 1 * time.Second
brk := segmentio.NewBroker(broker.Context(ctx), broker.Addrs(addrs...), segmentio.StatsInterval(5*time.Second),
segmentio.ClientID("test_sub"),
)
t.Logf("init")
if err := brk.Init(); err != nil {
t.Fatal(err)
}
t.Logf("connect")
if err := brk.Connect(ctx); err != nil {
t.Fatal(err)
}
defer func() {
t.Logf("disconnect")
if err := brk.Disconnect(ctx); err != nil {
t.Fatal(err)
}
}()
fmt.Printf("prefill topic\n")
go func() {
for i := 0; i < 900000; i++ {
if err := brk.Publish(ctx, topic, bm); err != nil {
t.Fatal(err)
}
//log.Printf("publish %d", i)
time.Sleep(200 * time.Millisecond)
}
}()
fmt.Printf("prefill complete\n")
var cnt uint64
var wait atomic.Value
wait.Store(true)
fn := func(msg broker.Event) error {
if wait.Load().(bool) {
wait.Store(false)
}
atomic.AddUint64(&cnt, 1)
return msg.Ack()
}
sub1, err := brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
if err != nil {
t.Fatal(err)
}
sub2, err := brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
if err != nil {
t.Fatal(err)
}
fmt.Printf("wait for ready\n")
for {
if !wait.Load().(bool) {
break
}
time.Sleep(1 * time.Second)
}
time.Sleep(5 * time.Second)
fmt.Printf("unsub\n")
if err := sub1.Unsubscribe(ctx); err != nil {
t.Fatal(err)
}
time.Sleep(9 * time.Second)
sub1, err = brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
if err != nil {
t.Fatal(err)
}
t1 := time.NewTicker(10 * time.Second)
defer t1.Stop()
t2 := time.NewTicker(30 * time.Second)
defer t2.Stop()
for {
select {
case <-t1.C:
fmt.Printf("unsub from sub2\n")
if err := sub2.Unsubscribe(ctx); err != nil {
t.Fatal(err)
}
fmt.Printf("sub1\n")
sub1, err = brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
if err != nil {
t.Fatal(err)
}
case <-t2.C:
fmt.Printf("unsub from sub1\n")
if err := sub1.Unsubscribe(ctx); err != nil {
t.Fatal(err)
}
fmt.Printf("sub2\n")
sub2, err = brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
if err != nil {
t.Fatal(err)
}
}
}
}
func TestSub(t *testing.T) {
topic := fmt.Sprintf("test_topic")
if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
@@ -106,8 +258,11 @@ func TestSub(t *testing.T) {
fmt.Printf("prefill topic\n")
go func() {
for i := 0; i < 900000; i++ {
// brk.Publish(ctx, topic, bm)
time.Sleep(1 * time.Second)
if err := brk.Publish(ctx, topic, bm); err != nil {
t.Fatal(err)
}
log.Printf("publish %d", i)
// time.Sleep(1 * time.Second)
}
}()
fmt.Printf("prefill complete\n")
@@ -200,7 +355,7 @@ func BenchmarkPub(b *testing.B) {
}
}()
segmentio.DefaultWriterConfig.Async = true
segmentio.DefaultWriterConfig.Async = false
segmentio.DefaultWriterConfig.BatchTimeout = 1 * time.Second
segmentio.DefaultWriterConfig.RequiredAcks = int(kafka.RequireAll)
fn := func(msgs []kafka.Message, err error) {