backport
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-02-21 15:08:58 +03:00
parent 23f0ad0f2f
commit 0a395235d6
11 changed files with 490 additions and 60 deletions

View File

@@ -64,6 +64,7 @@ func (s *subscriber) Unsubscribe(ctx context.Context) error {
s.killConsumers(ctx, kc)
close(s.done)
s.closed = true
s.c.ResumeFetchTopics(s.topic)
}
return nil
}
@@ -141,7 +142,7 @@ func (s *subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str
quit: make(chan struct{}),
done: make(chan struct{}),
recs: make(chan kgo.FetchTopicPartition, 4),
recs: make(chan kgo.FetchTopicPartition, 100),
handler: s.handler,
kopts: s.kopts,
opts: s.opts,
@@ -168,46 +169,63 @@ func (pc *consumer) consume() {
return
case p := <-pc.recs:
for _, record := range p.Records {
// ts := time.Now()
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc()
p := eventPool.Get().(*event)
p.msg.Header = nil
p.msg.Body = nil
p.topic = record.Topic
p.err = nil
p.ack = false
p.msg.Header = metadata.New(len(record.Headers))
for _, hdr := range record.Headers {
p.msg.Header.Set(hdr.Key, string(hdr.Value))
}
if pc.kopts.Codec.String() == "noop" {
p.msg.Header = metadata.New(len(record.Headers))
for _, hdr := range record.Headers {
p.msg.Header.Set(hdr.Key, string(hdr.Value))
}
p.msg.Body = record.Value
} else if pc.opts.BodyOnly {
p.msg.Body = record.Value
} else {
if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
p.err = err
p.msg.Body = record.Value
if eh != nil {
_ = eh(p)
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
if p.ack {
pc.c.MarkCommitRecords(record)
} else {
eventPool.Put(p)
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
return
}
eventPool.Put(p)
// te := time.Since(ts)
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
continue
} else {
if pc.kopts.Logger.V(logger.ErrorLevel) {
pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
}
}
// te := time.Since(ts)
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
eventPool.Put(p)
pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
return
}
}
err := pc.handler(p)
if err == nil {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc()
} else {
// pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
}
// pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
if err == nil && pc.opts.AutoAck {
p.ack = true
} else if err != nil {
@@ -220,6 +238,9 @@ func (pc *consumer) consume() {
}
}
}
// te := time.Since(ts)
// pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
// pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
if p.ack {
eventPool.Put(p)
pc.c.MarkCommitRecords(record)