dont modify connected state on errors
Some checks failed
coverage / build (push) Failing after 1m1s
test / test (push) Successful in 5m12s
sync / sync (push) Has been skipped

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-05-02 19:08:54 +03:00
parent db97b24904
commit ccd912adb2
3 changed files with 19 additions and 27 deletions

View File

@@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
if err != nil {
s.connected.Store(0)
// s.connected.Store(0)
if s.fatalOnError {
s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err)
}
@@ -282,7 +282,7 @@ func (pc *consumer) consume() {
pc.c.MarkCommitRecords(record)
} else {
sp.Finish()
pc.connected.Store(0)
// pc.connected.Store(0)
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited")
return
}