package segmentio_test

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"strings"
	"sync/atomic"
	"testing"
	"time"

	"github.com/segmentio/kafka-go"
	segmentio "go.unistack.org/micro-broker-segmentio/v3"
	victoriameter "go.unistack.org/micro-meter-victoriametrics/v3"
	https "go.unistack.org/micro-server-http/v3"
	"go.unistack.org/micro/v3/broker"
	"go.unistack.org/micro/v3/codec"
	"go.unistack.org/micro/v3/logger"
	"go.unistack.org/micro/v3/meter"
	meterhandler "go.unistack.org/micro/v3/meter/handler"
	"go.unistack.org/micro/v3/server"
)

type lg struct{}

func (l *lg) Printf(format string, args ...interface{}) {
	//	logger.Infof(context.Background(), format, args...)
}

var bm = &broker.Message{
	Header: map[string]string{"hkey": "hval"},
	Body:   []byte(`"body"`),
}

func TestConsumerGroup(t *testing.T) {
	topic := "test_topic"
	if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
		t.Skip()
	}

	if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)); err != nil {
		t.Fatal(err)
	}
	ctx := context.Background()

	var addrs []string
	if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
		addrs = []string{"127.0.0.1:9092"}
	} else {
		addrs = strings.Split(addr, ",")
	}

	meter.DefaultMeter = victoriameter.NewMeter()

	s := https.NewServer(server.Context(ctx), server.Address("127.0.0.1:0"), server.Codec("text/plain", codec.NewCodec()))
	if err := s.Init(); err != nil {
		t.Fatal(err)
	}
	if err := meterhandler.RegisterMeterServer(s, meterhandler.NewHandler()); err != nil {
		t.Fatal(err)
	}

	if err := s.Start(); err != nil {
		t.Fatal(err)
	}
	defer func() {
		if err := s.Stop(); err != nil {
			t.Fatal(err)
		}
	}()

	segmentio.DefaultWriterConfig.Async = true
	segmentio.DefaultWriterConfig.BatchTimeout = 1 * time.Second
	segmentio.DefaultWriterConfig.RequiredAcks = int(kafka.RequireAll)
	segmentio.DefaultReaderConfig.StartOffset = kafka.FirstOffset
	segmentio.DefaultReaderConfig.MinBytes = 1024 * 10        // 10 kb
	segmentio.DefaultReaderConfig.MaxBytes = 1024 * 1024 * 20 // 20 Mb
	segmentio.DefaultReaderConfig.MaxWait = 20 * time.Second  // 20s
	segmentio.DefaultReaderConfig.QueueCapacity = 500
	segmentio.DefaultReaderConfig.ReadBackoffMin = 2 * time.Second
	segmentio.DefaultReaderConfig.ReadBackoffMax = 5 * time.Second
	segmentio.DefaultReaderConfig.Logger = &lg{}
	segmentio.DefaultReaderConfig.CommitInterval = 1 * time.Second
	brk := segmentio.NewBroker(broker.Context(ctx), broker.Addrs(addrs...), segmentio.StatsInterval(5*time.Second),
		segmentio.ClientID("test_sub"),
	)
	t.Logf("init")
	if err := brk.Init(); err != nil {
		t.Fatal(err)
	}

	t.Logf("connect")
	if err := brk.Connect(ctx); err != nil {
		t.Fatal(err)
	}

	defer func() {
		t.Logf("disconnect")
		if err := brk.Disconnect(ctx); err != nil {
			t.Fatal(err)
		}
	}()

	fmt.Printf("prefill topic\n")
	go func() {
		for i := 0; i < 900000; i++ {
			if err := brk.Publish(ctx, topic, bm); err != nil {
				t.Fatal(err)
			}
			// log.Printf("publish %d", i)
			time.Sleep(600 * time.Millisecond)
		}
	}()
	fmt.Printf("prefill complete\n")

	var cnt uint64
	var wait atomic.Value
	wait.Store(true)

	fn := func(msg broker.Event) error {
		if wait.Load().(bool) {
			wait.Store(false)
		}
		atomic.AddUint64(&cnt, 1)
		fmt.Printf("processing mesage")
		return msg.Ack()
	}

	sub1, err := brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
	if err != nil {
		t.Fatal(err)
	}
	defer sub1.Unsubscribe(ctx)

	sub2, err := brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
	if err != nil {
		t.Fatal(err)
	}
	defer sub2.Unsubscribe(ctx)

	fmt.Printf("wait for ready\n")
	for {
		if !wait.Load().(bool) {
			break
		}
		time.Sleep(1 * time.Second)
	}
	/*
		time.Sleep(5 * time.Second)
		fmt.Printf("unsub\n")
		if err := sub1.Unsubscribe(ctx); err != nil {
			t.Fatal(err)
		}
		time.Sleep(9 * time.Second)
		sub1, err = brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
		if err != nil {
			t.Fatal(err)
		}
	*/
	/*
		t1 := time.NewTicker(10 * time.Second)
		defer t1.Stop()
		t2 := time.NewTicker(30 * time.Second)
		defer t2.Stop()

			for {
				select {
				case <-t1.C:
					fmt.Printf("unsub from sub2\n")
					if err := sub2.Unsubscribe(ctx); err != nil {
						t.Fatal(err)
					}
					fmt.Printf("sub1\n")
					sub1, err = brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
					if err != nil {
						t.Fatal(err)
					}
				case <-t2.C:
					fmt.Printf("unsub from sub1\n")
					if err := sub1.Unsubscribe(ctx); err != nil {
						t.Fatal(err)
					}
					fmt.Printf("sub2\n")
					sub2, err = brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true), broker.SubscribeAutoAck(true))
					if err != nil {
						t.Fatal(err)
					}
				}
			}
	*/
	select {}
}

func TestSub(t *testing.T) {
	topic := "test_topic"
	if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
		t.Skip()
	}

	if err := logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)); err != nil {
		t.Fatal(err)
	}
	ctx := context.Background()

	var addrs []string
	if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
		addrs = []string{"127.0.0.1:9092"}
	} else {
		addrs = strings.Split(addr, ",")
	}

	meter.DefaultMeter = victoriameter.NewMeter()

	s := https.NewServer(server.Context(ctx), server.Address("127.0.0.1:0"), server.Codec("text/plain", codec.NewCodec()))
	if err := s.Init(); err != nil {
		t.Fatal(err)
	}
	if err := meterhandler.RegisterMeterServer(s, meterhandler.NewHandler()); err != nil {
		t.Fatal(err)
	}

	if err := s.Start(); err != nil {
		t.Fatal(err)
	}
	defer func() {
		if err := s.Stop(); err != nil {
			t.Fatal(err)
		}
	}()

	segmentio.DefaultWriterConfig.Async = true
	segmentio.DefaultWriterConfig.BatchTimeout = 1 * time.Second
	segmentio.DefaultWriterConfig.RequiredAcks = int(kafka.RequireAll)
	segmentio.DefaultReaderConfig.StartOffset = kafka.FirstOffset
	segmentio.DefaultReaderConfig.MinBytes = 1024 * 10        // 10 kb
	segmentio.DefaultReaderConfig.MaxBytes = 1024 * 1024 * 20 // 20 Mb
	segmentio.DefaultReaderConfig.MaxWait = 20 * time.Second  // 20s
	segmentio.DefaultReaderConfig.QueueCapacity = 500
	segmentio.DefaultReaderConfig.ReadBackoffMin = 2 * time.Second
	segmentio.DefaultReaderConfig.ReadBackoffMax = 5 * time.Second
	segmentio.DefaultReaderConfig.Logger = &lg{}
	segmentio.DefaultReaderConfig.CommitInterval = 1 * time.Second
	brk := segmentio.NewBroker(broker.Context(ctx), broker.Addrs(addrs...), segmentio.StatsInterval(5*time.Second),
		segmentio.ClientID("test_sub"),
	)
	t.Logf("init")
	if err := brk.Init(); err != nil {
		t.Fatal(err)
	}

	t.Logf("connect")
	if err := brk.Connect(ctx); err != nil {
		t.Fatal(err)
	}

	defer func() {
		t.Logf("disconnect")
		if err := brk.Disconnect(ctx); err != nil {
			t.Fatal(err)
		}
	}()

	fmt.Printf("prefill topic\n")

	go func() {
		for i := 0; i < 900000; i++ {
			if err := brk.Publish(ctx, topic, bm); err != nil {
				t.Fatal(err)
			}
			log.Printf("publish %d", i)
			//		time.Sleep(1 * time.Second)
		}
	}()
	fmt.Printf("prefill complete\n")

	var cnt uint64
	var wait atomic.Value
	wait.Store(true)

	done := make(chan struct{})
	fn := func(msg broker.Event) error {
		if wait.Load().(bool) {
			wait.Store(false)
			fmt.Printf("done ready\n")
			close(done)
		}
		atomic.AddUint64(&cnt, 1)
		return msg.Ack()
	}

	sub, err := brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true))
	if err != nil {
		t.Fatal(err)
	}
	fmt.Printf("wait for ready\n")
	<-done
	fmt.Printf("wait for bench\n")
	fmt.Printf("start %s\n", time.Now().String())
	<-time.After(20 * time.Second)
	fmt.Printf("stop %s\n", time.Now().String())
	rcnt := atomic.LoadUint64(&cnt)

	req, err := http.NewRequest(http.MethodGet, "http://"+s.Options().Address+"/metrics", nil)
	if err != nil {
		t.Fatal(err)
	}
	req.Header.Add("Content-Type", "text/plain")

	rsp, err := (&http.Client{}).Do(req)
	if err != nil {
		t.Fatal(err)
	}

	defer rsp.Body.Close()

	buf, err := io.ReadAll(rsp.Body)
	if err != nil {
		t.Fatal(err)
	}

	fmt.Printf("unsub\n")
	if err := sub.Unsubscribe(ctx); err != nil {
		t.Fatal(err)
	}

	t.Logf("metrics: \n%s\n", buf)
	t.Logf("mesage count %d\n", rcnt)
}

func BenchmarkPub(b *testing.B) {
	if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
		b.Skip()
	}

	if err := logger.DefaultLogger.Init(logger.WithLevel(logger.TraceLevel)); err != nil {
		b.Fatal(err)
	}
	ctx := context.Background()

	var addrs []string
	if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
		addrs = []string{"127.0.0.1:9092"}
	} else {
		addrs = strings.Split(addr, ",")
	}

	meter.DefaultMeter = victoriameter.NewMeter()

	s := https.NewServer(server.Context(ctx), server.Address("127.0.0.1:0"), server.Codec("text/plain", codec.NewCodec()))
	if err := s.Init(); err != nil {
		b.Fatal(err)
	}
	if err := meterhandler.RegisterMeterServer(s, meterhandler.NewHandler()); err != nil {
		b.Fatal(err)
	}

	if err := s.Start(); err != nil {
		b.Fatal(err)
	}
	defer func() {
		if err := s.Stop(); err != nil {
			b.Fatal(err)
		}
	}()

	segmentio.DefaultWriterConfig.Async = false
	segmentio.DefaultWriterConfig.BatchTimeout = 1 * time.Second
	segmentio.DefaultWriterConfig.RequiredAcks = int(kafka.RequireAll)
	fn := func(msgs []kafka.Message, err error) {
		if err != nil {
			b.Logf("err %v", err)
		}
	}
	brk := segmentio.NewBroker(broker.Context(ctx), broker.Addrs(addrs...), segmentio.StatsInterval(1*time.Second),
		segmentio.WriterCompletionFunc(fn))
	b.Logf("init")
	if err := brk.Init(); err != nil {
		b.Fatal(err)
	}

	b.Logf("connect")
	if err := brk.Connect(ctx); err != nil {
		b.Fatal(err)
	}
	defer func() {
		b.Logf("disconnect")
		if err := brk.Disconnect(ctx); err != nil {
			b.Fatal(err)
		}
	}()

	cnt := 0
	b.ResetTimer()
	for n := 0; n < b.N; n++ {
		if err := brk.Publish(ctx, "test_topic", bm); err != nil {
			b.Fatal(err)
		}
		cnt++
	}

	req, err := http.NewRequest(http.MethodGet, "http://"+s.Options().Address+"/metrics", nil)
	if err != nil {
		b.Fatal(err)
	}
	req.Header.Add("Content-Type", "text/plain")

	rsp, err := (&http.Client{}).Do(req)
	if err != nil {
		b.Fatal(err)
	}

	defer rsp.Body.Close()

	buf, err := io.ReadAll(rsp.Body)
	if err != nil {
		b.Fatal(err)
	}

	b.Logf("metrics: \n%s\n", buf)
	b.Logf("mesage count %d\n", cnt)
}

func BenchmarkPubSub(b *testing.B) {
	b.Skip()
	ctx := context.Background()
	topic := "test_topic"
	var addrs []string
	if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
		addrs = []string{"127.0.0.1:9092"}
	} else {
		addrs = strings.Split(addr, ",")
	}

	segmentio.DefaultWriterConfig.Async = true
	segmentio.DefaultWriterConfig.BatchTimeout = 1 * time.Second
	segmentio.DefaultReaderConfig.CommitInterval = 2 * time.Second
	brk := segmentio.NewBroker(broker.Context(ctx), broker.Addrs(addrs...), segmentio.StatsInterval(1*time.Minute))
	if err := brk.Init(); err != nil {
		b.Fatal(err)
	}

	if err := brk.Connect(ctx); err != nil {
		b.Fatal(err)
	}
	defer func() {
		if err := brk.Disconnect(ctx); err != nil {
			b.Fatal(err)
		}
	}()

	wait := true
	var cnt uint64
	fn := func(msg broker.Event) error {
		if wait {
			wait = false
		}
		atomic.AddUint64(&cnt, 1)
		return msg.Ack()
	}

	if err := brk.Publish(ctx, topic, bm); err != nil {
		b.Fatal(err)
	}

	sub, err := brk.Subscribe(ctx, topic, fn, broker.SubscribeGroup("test"), broker.SubscribeBodyOnly(true))
	if err != nil {
		b.Fatal(err)
	}
	defer func() {
		if err := sub.Unsubscribe(ctx); err != nil {
			b.Fatal(err)
		}
	}()

	for {
		if !wait {
			break
		}
		time.Sleep(1 * time.Second)
	}
	b.ResetTimer()
	var result error
	sent := uint64(0)
	for n := 0; n < b.N; n++ {
		if err := brk.Publish(ctx, topic, bm); err != nil {
			b.Fatal(err)
		} else {
			result = err
		}
		sent++
	}

	b.Logf("publish done")
	for {
		c := atomic.LoadUint64(&cnt)
		if c >= sent {
			break
		}
		fmt.Printf("c %d seen %d\n", c, sent)
		time.Sleep(1 * time.Second)
	}
	_ = result
	fmt.Printf("c %d seen %d\n", atomic.LoadUint64(&cnt), sent)
}

func TestPubSub(t *testing.T) {
	if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 {
		t.Skip()
	}

	if err := logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)); err != nil {
		t.Fatal(err)
	}
	ctx := context.Background()

	var addrs []string
	if addr := os.Getenv("BROKER_ADDRS"); len(addr) == 0 {
		addrs = []string{"127.0.0.1:9092"}
	} else {
		addrs = strings.Split(addr, ",")
	}

	meter.DefaultMeter = victoriameter.NewMeter()

	s := https.NewServer(server.Context(ctx), server.Address("127.0.0.1:0"), server.Codec("text/plain", codec.NewCodec()))
	if err := s.Init(); err != nil {
		t.Fatal(err)
	}
	if err := meterhandler.RegisterMeterServer(s, meterhandler.NewHandler()); err != nil {
		t.Fatal(err)
	}

	if err := s.Start(); err != nil {
		t.Fatal(err)
	}
	defer func() {
		if err := s.Stop(); err != nil {
			t.Fatal(err)
		}
	}()

	b := segmentio.NewBroker(broker.Context(ctx), broker.Addrs(addrs...), segmentio.StatsInterval(500*time.Millisecond),
		segmentio.ClientID("test_pubsub"))
	t.Logf("init")
	if err := b.Init(); err != nil {
		t.Fatal(err)
	}

	t.Logf("connect")
	if err := b.Connect(ctx); err != nil {
		t.Fatal(err)
	}
	defer func() {
		t.Logf("disconnect")
		if err := b.Disconnect(ctx); err != nil {
			t.Fatal(err)
		}
	}()

	wait := true
	fn := func(msg broker.Event) error {
		wait = false
		return msg.Ack()
	}

	t.Logf("subscribe")
	sub, err := b.Subscribe(ctx, "test_topic", fn, broker.SubscribeGroup("test"))
	if err != nil {
		t.Fatal(err)
	}

	defer func() {
		t.Logf("unsubscribe")
		if err := sub.Unsubscribe(ctx); err != nil {
			t.Fatal(err)
		}
	}()

	if err := b.Publish(ctx, "test_topic", bm); err != nil {
		t.Fatal(err)
	}

	for {
		if !wait {
			break
		}
		time.Sleep(1 * time.Second)
	}

	req, err := http.NewRequest(http.MethodGet, "http://"+s.Options().Address+"/metrics", nil)
	if err != nil {
		t.Fatal(err)
	}
	req.Header.Add("Content-Type", "text/plain")

	rsp, err := (&http.Client{}).Do(req)
	if err != nil {
		t.Fatal(err)
	}

	defer rsp.Body.Close()

	buf, err := io.ReadAll(rsp.Body)
	if err != nil {
		t.Fatal(err)
	}

	t.Logf("metrics: \n%s\n", buf)
}