Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
@@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
||||
|
||||
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
|
||||
if err != nil {
|
||||
s.connected.Store(0)
|
||||
// s.connected.Store(0)
|
||||
if s.fatalOnError {
|
||||
s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err)
|
||||
}
|
||||
@@ -199,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
|
||||
s.killConsumers(ctx, revoked)
|
||||
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
||||
s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err)
|
||||
s.connected.Store(0)
|
||||
// s.connected.Store(0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,7 +279,7 @@ func (pc *consumer) consume() {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
pc.connected.Store(0)
|
||||
// pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
return
|
||||
}
|
||||
@@ -296,7 +296,7 @@ func (pc *consumer) consume() {
|
||||
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
||||
eventPool.Put(p)
|
||||
pc.connected.Store(0)
|
||||
// pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
|
||||
sp.Finish()
|
||||
return
|
||||
@@ -334,7 +334,7 @@ func (pc *consumer) consume() {
|
||||
pc.c.MarkCommitRecords(record)
|
||||
} else {
|
||||
eventPool.Put(p)
|
||||
pc.connected.Store(0)
|
||||
// pc.connected.Store(0)
|
||||
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
||||
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
|
||||
sp.Finish()
|
||||
|
Reference in New Issue
Block a user