|
|
|
@@ -119,7 +119,7 @@ func (s *Subscriber) poll(ctx context.Context) {
|
|
|
|
|
|
|
|
|
|
s.Lock()
|
|
|
|
|
for p, l := range lmap {
|
|
|
|
|
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p)), "lag", strconv.Itoa(int(l.Lag)))
|
|
|
|
|
s.kopts.Meter.Counter(semconv.BrokerGroupLag, "topic", s.topic, "group", s.opts.Group, "partition", strconv.Itoa(int(p))).Set(uint64(l.Lag))
|
|
|
|
|
}
|
|
|
|
|
s.Unlock()
|
|
|
|
|
|
|
|
|
@@ -177,7 +177,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32)
|
|
|
|
|
|
|
|
|
|
func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.connected.Store(0)
|
|
|
|
|
// s.connected.Store(0)
|
|
|
|
|
if s.fatalOnError {
|
|
|
|
|
s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err)
|
|
|
|
|
}
|
|
|
|
@@ -199,7 +199,7 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str
|
|
|
|
|
s.killConsumers(ctx, revoked)
|
|
|
|
|
if err := c.CommitMarkedOffsets(ctx); err != nil {
|
|
|
|
|
s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err)
|
|
|
|
|
s.connected.Store(0)
|
|
|
|
|
// s.connected.Store(0)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -279,7 +279,7 @@ func (pc *consumer) consume() {
|
|
|
|
|
pc.c.MarkCommitRecords(record)
|
|
|
|
|
} else {
|
|
|
|
|
eventPool.Put(p)
|
|
|
|
|
pc.connected.Store(0)
|
|
|
|
|
// pc.connected.Store(0)
|
|
|
|
|
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@@ -296,7 +296,7 @@ func (pc *consumer) consume() {
|
|
|
|
|
pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
|
|
|
|
pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds())
|
|
|
|
|
eventPool.Put(p)
|
|
|
|
|
pc.connected.Store(0)
|
|
|
|
|
// pc.connected.Store(0)
|
|
|
|
|
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
|
|
|
|
|
sp.Finish()
|
|
|
|
|
return
|
|
|
|
@@ -334,7 +334,7 @@ func (pc *consumer) consume() {
|
|
|
|
|
pc.c.MarkCommitRecords(record)
|
|
|
|
|
} else {
|
|
|
|
|
eventPool.Put(p)
|
|
|
|
|
pc.connected.Store(0)
|
|
|
|
|
// pc.connected.Store(0)
|
|
|
|
|
pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
|
|
|
|
|
sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage")
|
|
|
|
|
sp.Finish()
|
|
|
|
|