From c8eeb34efe7196d9a11bab6c82697fca6886fcdb Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 2 Dec 2025 22:08:09 +0300 Subject: [PATCH] pass errors from broker to subscribers Signed-off-by: Vasiliy Tolstov --- hook_event.go | 12 +-- kgo.go | 1 + kgo_test.go | 6 +- meter.go | 16 ++-- subscriber.go | 199 ++++++++++++++++++++++++++++++++++++++++++++------ 5 files changed, 195 insertions(+), 39 deletions(-) diff --git a/hook_event.go b/hook_event.go index 3372f42..c3a2982 100644 --- a/hook_event.go +++ b/hook_event.go @@ -17,12 +17,12 @@ type hookEvent struct { } var ( - _ kgo.HookBrokerConnect = &hookEvent{} - _ kgo.HookBrokerDisconnect = &hookEvent{} - _ kgo.HookBrokerRead = &hookEvent{} - _ kgo.HookBrokerWrite = &hookEvent{} - _ kgo.HookGroupManageError = &hookEvent{} - _ kgo.HookProduceRecordUnbuffered = &hookEvent{} + _ kgo.HookBrokerConnect = (*hookEvent)(nil) + _ kgo.HookBrokerDisconnect = (*hookEvent)(nil) + _ kgo.HookBrokerRead = (*hookEvent)(nil) + _ kgo.HookBrokerWrite = (*hookEvent)(nil) + _ kgo.HookGroupManageError = (*hookEvent)(nil) + _ kgo.HookProduceRecordUnbuffered = (*hookEvent)(nil) ) func (m *hookEvent) OnGroupManageError(err error) { diff --git a/kgo.go b/kgo.go index bd72dea..d34012a 100644 --- a/kgo.go +++ b/kgo.go @@ -489,6 +489,7 @@ func (k *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han kgo.OnPartitionsLost(sub.lost), kgo.AutoCommitCallback(sub.autocommit), kgo.AutoCommitMarks(), + kgo.WithHooks(sub), ) if options.Context != nil { diff --git a/kgo_test.go b/kgo_test.go index 891f504..93dcc97 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -17,7 +17,7 @@ import ( ) var ( - msgcnt = int64(1200) + msgcnt = int64(10) group = "38" prefill = true loglevel = logger.DebugLevel @@ -80,7 +80,7 @@ func TestFail(t *testing.T) { for _, msg := range msgs { t.Logf("broker publish") if err := b.Publish(ctx, "test", msg); err != nil { - t.Fatal(err) + break } } // t.Skip() @@ -108,7 +108,7 @@ func TestFail(t *testing.T) { for { t.Logf("health check") - if !b.Health() { + if b.Health() { t.Logf("health works") break } diff --git a/meter.go b/meter.go index a9123fb..8ee7012 100644 --- a/meter.go +++ b/meter.go @@ -14,18 +14,18 @@ type hookMeter struct { } var ( - _ kgo.HookBrokerConnect = &hookMeter{} - _ kgo.HookBrokerDisconnect = &hookMeter{} + _ kgo.HookBrokerConnect = (*hookMeter)(nil) + _ kgo.HookBrokerDisconnect = (*hookMeter)(nil) // HookBrokerE2E - _ kgo.HookBrokerRead = &hookMeter{} - _ kgo.HookBrokerThrottle = &hookMeter{} - _ kgo.HookBrokerWrite = &hookMeter{} - _ kgo.HookFetchBatchRead = &hookMeter{} + _ kgo.HookBrokerRead = (*hookMeter)(nil) + _ kgo.HookBrokerThrottle = (*hookMeter)(nil) + _ kgo.HookBrokerWrite = (*hookMeter)(nil) + _ kgo.HookFetchBatchRead = (*hookMeter)(nil) // HookFetchRecordBuffered // HookFetchRecordUnbuffered - _ kgo.HookGroupManageError = &hookMeter{} + _ kgo.HookGroupManageError = (*hookMeter)(nil) // HookNewClient - _ kgo.HookProduceBatchWritten = &hookMeter{} + _ kgo.HookProduceBatchWritten = (*hookMeter)(nil) // HookProduceRecordBuffered // HookProduceRecordPartitioned // HookProduceRecordUnbuffered diff --git a/subscriber.go b/subscriber.go index de487f1..e221e90 100644 --- a/subscriber.go +++ b/subscriber.go @@ -3,6 +3,8 @@ package kgo import ( "context" "fmt" + "maps" + "net" "strconv" "sync" "sync/atomic" @@ -60,7 +62,7 @@ type Subscriber struct { closed bool fatalOnError bool - sync.RWMutex + mu sync.RWMutex } func (s *Subscriber) Client() *kgo.Client { @@ -114,13 +116,24 @@ func (s *Subscriber) poll(ctx context.Context) { fetches := s.c.PollRecords(ctx, maxInflight) if !s.closed && fetches.IsClientClosed() { s.closed = true + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(kgo.ErrClientClosed, tp.t, tp.p) + } + } return } fetches.EachError(func(t string, p int32, err error) { - if kgo.IsRetryableBrokerErr(err) { - s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d error", t, p), err) - } else { - s.kopts.Logger.Fatal(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d error", t, p), err) + tps := tp{t, p} + s.mu.Lock() + c := s.consumers[tps] + s.mu.Unlock() + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, t, p) } }) @@ -141,7 +154,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) for _, partition := range partitions { tps := tp{topic, partition} pc, ok := s.consumers[tps] - if !ok { + if !ok || pc == nil { continue } delete(s.consumers, tps) @@ -155,11 +168,20 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) } } -func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { +func (s *Subscriber) autocommit(_ *kgo.Client, r *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { if err != nil { - // s.connected.Store(0) - if s.fatalOnError { - s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for _, tc := range r.Topics { + for _, c := range s.consumers { + if c != nil { + for _, p := range tc.Partitions { + c.recs <- newErrorFetchTopicPartition(err, tc.Topic, p.Partition) + } + } + } } } } @@ -169,7 +191,6 @@ func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][] s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] lost %#+v", lost)) } s.killConsumers(ctx, lost) - // s.connected.Store(0) } func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { @@ -178,8 +199,15 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str } s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { - s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) - // s.connected.Store(0) + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range s.consumers { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } } } @@ -199,9 +227,9 @@ func (s *Subscriber) assigned(_ context.Context, c *kgo.Client, assigned map[str opts: s.opts, connected: s.connected, } - s.Lock() + s.mu.Lock() s.consumers[tp{topic, partition}] = pc - s.Unlock() + s.mu.Unlock() go pc.consume() } } @@ -219,11 +247,24 @@ func (pc *consumer) consume() { eh = pc.opts.ErrorHandler } + var pm *event + for { select { case <-pc.quit: return case p := <-pc.recs: + if p.Err != nil || p.FetchPartition.Err != nil { + if p.Err != nil { + pm = pc.newErrorMessage(p.Err, p.Topic, p.Partition) + } else if p.FetchPartition.Err != nil { + pm = pc.newErrorMessage(p.FetchPartition.Err, p.Topic, p.Partition) + } + _ = pc.handler(pm) + eventPool.Put(pm) + return + } + for _, record := range p.Records { ctx, sp := pc.htracer.WithProcessSpan(record) ts := time.Now() @@ -269,9 +310,12 @@ func (pc *consumer) consume() { if p.ack { pc.c.MarkCommitRecords(record) } else { + if sp != nil { + sp.Finish() + } eventPool.Put(p) - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + pm := pc.newErrorMessage(ErrLostMessage, record.Topic, record.Partition) + pc.handler(pm) return } eventPool.Put(p) @@ -280,15 +324,16 @@ func (pc *consumer) consume() { pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) continue } else { - pc.kopts.Logger.Error(pc.kopts.Context, "[kgo]: unmarshal error", err) + pm := pc.newErrorMessage(err, record.Topic, record.Partition) + pc.handler(pm) } te := time.Since(ts) pc.kopts.Meter.Counter(semconv.SubscribeMessageInflight, "endpoint", record.Topic, "topic", record.Topic).Dec() pc.kopts.Meter.Summary(semconv.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) pc.kopts.Meter.Histogram(semconv.SubscribeMessageDurationSeconds, "endpoint", record.Topic, "topic", record.Topic).Update(te.Seconds()) eventPool.Put(p) - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?") + pm := pc.newErrorMessage(ErrLostMessage, record.Topic, record.Partition) + pc.handler(pm) if sp != nil { sp.Finish() } @@ -337,8 +382,8 @@ func (pc *consumer) consume() { pc.c.MarkCommitRecords(record) } else { eventPool.Put(p) - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] ErrLostMessage wtf?") + pm := pc.newErrorMessage(ErrLostMessage, record.Topic, record.Partition) + pc.handler(pm) if sp != nil { sp.SetStatus(tracer.SpanStatusError, "ErrLostMessage") sp.Finish() @@ -352,3 +397,113 @@ func (pc *consumer) consume() { } } } + +func (pc *consumer) newErrorMessage(err error, t string, p int32) *event { + pm := eventPool.Get().(*event) + + pm.ack = false + pm.msg = &broker.Message{Header: metadata.New(2)} + pm.err = err + pm.topic = t + pm.ctx = context.Background() + pm.msg.Header.Set("Micro-Partition", strconv.FormatInt(int64(p), 10)) + pm.msg.Header.Set("Micro-Topic", t) + return pm +} + +func newErrorFetchTopicPartition(err error, t string, p int32) kgo.FetchTopicPartition { + return kgo.FetchTopicPartition{ + Topic: t, + FetchPartition: kgo.FetchPartition{ + Partition: p, + Err: err, + }, + } +} + +var ( + _ kgo.HookBrokerConnect = (*Subscriber)(nil) + _ kgo.HookBrokerDisconnect = (*Subscriber)(nil) + _ kgo.HookBrokerRead = (*Subscriber)(nil) + _ kgo.HookBrokerWrite = (*Subscriber)(nil) + _ kgo.HookGroupManageError = (*Subscriber)(nil) + _ kgo.HookProduceRecordUnbuffered = (*Subscriber)(nil) +) + +func (s *Subscriber) OnGroupManageError(err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { +} + +func (s *Subscriber) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +}