fix error passing

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-16 01:36:26 +03:00
parent 09f4a15ec4
commit 8f7d51ed3b

20
util.go
View File

@ -2,6 +2,7 @@ package kgo
import (
"context"
"errors"
"sync"
kgo "github.com/twmb/franz-go/pkg/kgo"
@ -9,6 +10,8 @@ import (
"github.com/unistack-org/micro/v3/logger"
)
var ErrLostMessage = errors.New("message not marked for offsets commit and will be lost in next iteration")
var pPool = sync.Pool{
New: func() interface{} {
return &publication{msg: &broker.Message{}}
@ -44,7 +47,7 @@ func (s *subscriber) run(ctx context.Context) {
}
if len(fetches.Errors()) > 0 {
for _, err := range fetches.Errors() {
s.kopts.Logger.Errorf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err)
s.kopts.Logger.Fatalf(ctx, "fetch err topic %s partition %d: %v", err.Topic, err.Partition, err.Err)
}
// TODO: fatal ?
return
@ -62,6 +65,9 @@ func (s *subscriber) run(ctx context.Context) {
return
}
select {
case err := <-w.cherr:
s.kopts.Logger.Fatalf(ctx, "handle err: %v", err)
return
case w.recs <- p.Records:
case <-w.done:
}
@ -159,6 +165,10 @@ func (w *worker) handle() {
_ = eh(p)
if p.ack {
w.reader.MarkCommitRecords(record)
} else {
w.cherr <- ErrLostMessage
pPool.Put(p)
return
}
pPool.Put(p)
continue
@ -186,9 +196,13 @@ func (w *worker) handle() {
}
}
if p.ack {
w.reader.MarkCommitRecords(record)
}
pPool.Put(p)
w.reader.MarkCommitRecords(record)
} else {
pPool.Put(p)
w.cherr <- ErrLostMessage
return
}
}
if paused {
paused = false