diff --git a/broker.go b/broker.go index f3cd752..a7b2ece 100644 --- a/broker.go +++ b/broker.go @@ -27,7 +27,7 @@ var ( func (m *hookEvent) OnGroupManageError(err error) { if err != nil { - m.connected.Store(0) + // m.connected.Store(0) // if m.fatalOnError { m.log.Error(context.TODO(), "kgo.OnGroupManageError", err) //} diff --git a/subscriber.go b/subscriber.go index 2dbe756..d891f8f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { if err != nil { - s.connected.Store(0) + // s.connected.Store(0) if s.fatalOnError { s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) } @@ -199,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) - s.connected.Store(0) + // s.connected.Store(0) } } @@ -279,7 +279,7 @@ func (pc *consumer) consume() { pc.c.MarkCommitRecords(record) } else { eventPool.Put(p) - pc.connected.Store(0) + // pc.connected.Store(0) pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") return } @@ -296,7 +296,7 @@ func (pc *consumer) consume() { pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) eventPool.Put(p) - pc.connected.Store(0) + // pc.connected.Store(0) pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") sp.Finish() return @@ -334,7 +334,7 @@ func (pc *consumer) consume() { pc.c.MarkCommitRecords(record) } else { eventPool.Put(p) - pc.connected.Store(0) + // pc.connected.Store(0) pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") sp.Finish()