From 8f7d51ed3b8d9d1b024f2c2c9a2e965c3681995f Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 16 Sep 2021 01:36:26 +0300 Subject: [PATCH] fix error passing Signed-off-by: Vasiliy Tolstov --- util.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/util.go b/util.go index 5f8008f..4551a7a 100644 --- a/util.go +++ b/util.go @@ -2,6 +2,7 @@ package kgo import ( "context" + "errors" "sync" kgo "github.com/twmb/franz-go/pkg/kgo" @@ -9,6 +10,8 @@ import ( "github.com/unistack-org/micro/v3/logger" ) +var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration") + var pPool = sync.Pool{ New: func() interface{} { return &publication{msg: &broker.Message{}} @@ -44,7 +47,7 @@ func (s *subscriber) run(ctx context.Context) { } if len(fetches.Errors()) > 0 { for _, err := range fetches.Errors() { - s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err) + s.kopts.Logger.Fatalf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err) } // TODO: fatal ? return @@ -62,6 +65,9 @@ func (s *subscriber) run(ctx context.Context) { return } select { + case err := <-w.cherr: + s.kopts.Logger.Fatalf(ctx, "handle err: %v", err) + return case w.recs <- p.Records: case <-w.done: } @@ -159,6 +165,10 @@ func (w *worker) handle() { _ = eh(p) if p.ack { w.reader.MarkCommitRecords(record) + } else { + w.cherr <- ErrLostMessage + pPool.Put(p) + return } pPool.Put(p) continue @@ -186,9 +196,13 @@ func (w *worker) handle() { } } if p.ack { + pPool.Put(p) w.reader.MarkCommitRecords(record) + } else { + pPool.Put(p) + w.cherr <- ErrLostMessage + return } - pPool.Put(p) } if paused { paused = false