diff --git a/kgo.go b/kgo.go index 0eda025..5eeef97 100644 --- a/kgo.go +++ b/kgo.go @@ -19,6 +19,12 @@ import ( "golang.org/x/sync/errgroup" ) +var pPool = sync.Pool{ + New: func() interface{} { + return &broker.Message{} + }, +} + type kBroker struct { writer *kgo.Client // used only to push messages kopts []kgo.Opt @@ -397,7 +403,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err } // preallocate optimistic - crecords := make([]*kgo.Record, 0, 1000) + crecords := make([]*kgo.Record, 0, 10000) eh := s.kopts.ErrorHandler if s.opts.ErrorHandler != nil { @@ -430,7 +436,6 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err return err case <-ticker.C: v := atomic.LoadInt64(&cnt) - s.kopts.Logger.Debugf(ctx, "records to commit: %v", v) if v == 0 { return nil } @@ -439,9 +444,8 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err mu.Unlock() return err } - s.kopts.Logger.Debugf(ctx, "records to need process after commit: %v", v-int64(len(crecords))) atomic.AddInt64(&cnt, -int64(len(crecords))) - crecords = nil + crecords = crecords[:0] mu.Unlock() } } @@ -457,7 +461,10 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err case <-gctx.Done(): return gctx.Err() default: - p := &publication{topic: record.Topic, msg: &broker.Message{}} + msg := pPool.Get().(*broker.Message) + msg.Header = nil + msg.Body = nil + p := &publication{topic: record.Topic, msg: msg} if s.opts.BodyOnly { p.msg.Body = record.Value } else { @@ -471,12 +478,14 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err crecords = append(crecords, record) mu.Unlock() } + pPool.Put(msg) return nil } else { if s.kopts.Logger.V(logger.ErrorLevel) { s.kopts.Logger.Errorf(s.kopts.Context, "[kgo]: failed to unmarshal: %v", err) } } + pPool.Put(msg) return err } } @@ -501,6 +510,7 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err mu.Unlock() } } + pPool.Put(msg) } } return nil @@ -511,10 +521,6 @@ func (s *subscriber) handleFetches(ctx context.Context, fetches kgo.Fetches) err return err } - if s.kopts.Logger.V(logger.DebugLevel) { - logger.Debugf(ctx, "commit %d records", len(crecords)) - } - return s.reader.CommitRecords(ctx, crecords...) } @@ -525,6 +531,7 @@ func (k *kBroker) String() string { func NewBroker(opts ...broker.Option) broker.Broker { options := broker.NewOptions(opts...) kopts := []kgo.Opt{ + kgo.BatchCompression(kgo.NoCompression()), kgo.WithLogger(&mlogger{l: options.Logger, ctx: options.Context}), kgo.RequiredAcks(kgo.AllISRAcks()), kgo.RetryBackoffFn( diff --git a/kgo_test.go b/kgo_test.go index 8f11fb3..7e29f98 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -2,7 +2,6 @@ package kgo_test import ( "context" - "fmt" "os" "strings" "sync/atomic" @@ -40,9 +39,8 @@ func TestPubSub(t *testing.T) { b := kgo.NewBroker( broker.Codec(jsoncodec.NewCodec()), broker.Addrs(addrs...), - kgo.ClientID("test"), - kgo.CommitInterval(1*time.Second), - kgo.Options(kg.FetchMaxBytes(10*1024)), + kgo.CommitInterval(5*time.Second), + kgo.Options(kg.ClientID("test"), kg.FetchMaxBytes(10*1024*1024)), ) if err := b.Init(); err != nil { t.Fatal(err) @@ -57,10 +55,8 @@ func TestPubSub(t *testing.T) { t.Fatal(err) } }() - _ = bm - /* - fmt.Printf("prefill") +/* msgs := make([]*broker.Message, 0, 600000) for i := 0; i < 600000; i++ { msgs = append(msgs, bm) @@ -70,7 +66,7 @@ func TestPubSub(t *testing.T) { t.Fatal(err) } t.Skip() - */ +*/ done := make(chan bool, 1) idx := int64(0) fn := func(msg broker.Event) error { @@ -79,7 +75,10 @@ func TestPubSub(t *testing.T) { return msg.Ack() } - sub, err := b.Subscribe(ctx, "test", fn, broker.SubscribeAutoAck(true), broker.SubscribeGroup("test17"), broker.SubscribeBodyOnly(true)) + sub, err := b.Subscribe(ctx, "test", fn, + broker.SubscribeAutoAck(true), + broker.SubscribeGroup("test23"), + broker.SubscribeBodyOnly(true)) if err != nil { t.Fatal(err) } @@ -89,14 +88,16 @@ func TestPubSub(t *testing.T) { } }() - for { - if v := atomic.LoadInt64(&idx); v == 12637303 { - close(done) - break - } else { - fmt.Printf("processed %v\n", v) + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + + go func() { + for { + select { + case <-ticker.C: + close(done) + } } - time.Sleep(1 * time.Second) - } + }() <-done }