From 7a29f49751cc0602bc630b26d82b50003ddfe9ff Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 2 Dec 2025 17:19:18 +0300 Subject: [PATCH] pass errors from broker to subscribers Signed-off-by: Vasiliy Tolstov --- hook_event.go | 12 +-- kgo.go | 15 ++-- kgo_test.go | 2 +- meter.go | 16 ++-- subscriber.go | 214 ++++++++++++++++++++++++++++++++++++++++++++------ 5 files changed, 213 insertions(+), 46 deletions(-) diff --git a/hook_event.go b/hook_event.go index 83afda9..8b427c0 100644 --- a/hook_event.go +++ b/hook_event.go @@ -17,12 +17,12 @@ type hookEvent struct { } var ( - _ kgo.HookBrokerConnect = &hookEvent{} - _ kgo.HookBrokerDisconnect = &hookEvent{} - _ kgo.HookBrokerRead = &hookEvent{} - _ kgo.HookBrokerWrite = &hookEvent{} - _ kgo.HookGroupManageError = &hookEvent{} - _ kgo.HookProduceRecordUnbuffered = &hookEvent{} + _ kgo.HookBrokerConnect = (*hookEvent)(nil) + _ kgo.HookBrokerDisconnect = (*hookEvent)(nil) + _ kgo.HookBrokerRead = (*hookEvent)(nil) + _ kgo.HookBrokerWrite = (*hookEvent)(nil) + _ kgo.HookGroupManageError = (*hookEvent)(nil) + _ kgo.HookProduceRecordUnbuffered = (*hookEvent)(nil) ) func (m *hookEvent) OnGroupManageError(err error) { diff --git a/kgo.go b/kgo.go index 9babf9a..c7b645f 100644 --- a/kgo.go +++ b/kgo.go @@ -112,6 +112,7 @@ type kgoMessage struct { hdr metadata.Metadata opts broker.MessageOptions ack bool + err error } func (m *kgoMessage) Ack() error { @@ -135,6 +136,10 @@ func (m *kgoMessage) Topic() string { return "" } +func (m *kgoMessage) Error() error { + return m.err +} + func (m *kgoMessage) Unmarshal(dst interface{}, opts ...codec.Option) error { return m.c.Unmarshal(m.body, dst) } @@ -519,15 +524,6 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac options := broker.NewSubscribeOptions(opts...) - switch handler.(type) { - default: - return nil, broker.ErrInvalidHandler - case func(broker.Message) error: - break - case func([]broker.Message) error: - break - } - if options.Group == "" { uid, err := id.New() if err != nil { @@ -584,6 +580,7 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac kgo.OnPartitionsLost(sub.lost), kgo.AutoCommitCallback(sub.autocommit), kgo.AutoCommitMarks(), + kgo.WithHooks(sub), ) if options.Context != nil { diff --git a/kgo_test.go b/kgo_test.go index 0bfb552..47ee974 100644 --- a/kgo_test.go +++ b/kgo_test.go @@ -82,7 +82,7 @@ func TestFail(t *testing.T) { for _, msg := range msgs { // t.Logf("broker publish") if err := b.Publish(ctx, "test.fail", msg); err != nil { - t.Fatal(err) + // t.Fatal(err) } } }() diff --git a/meter.go b/meter.go index 104f661..a1618e8 100644 --- a/meter.go +++ b/meter.go @@ -14,18 +14,18 @@ type hookMeter struct { } var ( - _ kgo.HookBrokerConnect = &hookMeter{} - _ kgo.HookBrokerDisconnect = &hookMeter{} + _ kgo.HookBrokerConnect = (*hookMeter)(nil) + _ kgo.HookBrokerDisconnect = (*hookMeter)(nil) // HookBrokerE2E - _ kgo.HookBrokerRead = &hookMeter{} - _ kgo.HookBrokerThrottle = &hookMeter{} - _ kgo.HookBrokerWrite = &hookMeter{} - _ kgo.HookFetchBatchRead = &hookMeter{} + _ kgo.HookBrokerRead = (*hookMeter)(nil) + _ kgo.HookBrokerThrottle = (*hookMeter)(nil) + _ kgo.HookBrokerWrite = (*hookMeter)(nil) + _ kgo.HookFetchBatchRead = (*hookMeter)(nil) // HookFetchRecordBuffered // HookFetchRecordUnbuffered - _ kgo.HookGroupManageError = &hookMeter{} + _ kgo.HookGroupManageError = (*hookMeter)(nil) // HookNewClient - _ kgo.HookProduceBatchWritten = &hookMeter{} + _ kgo.HookProduceBatchWritten = (*hookMeter)(nil) // HookProduceRecordBuffered // HookProduceRecordPartitioned // HookProduceRecordUnbuffered diff --git a/subscriber.go b/subscriber.go index 5a7b300..00ab5cc 100644 --- a/subscriber.go +++ b/subscriber.go @@ -3,6 +3,8 @@ package kgo import ( "context" "fmt" + "maps" + "net" "strconv" "sync" "sync/atomic" @@ -104,13 +106,24 @@ func (s *Subscriber) poll(ctx context.Context) { fetches := s.c.PollRecords(ctx, maxInflight) if !s.closed && fetches.IsClientClosed() { s.closed = true + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(kgo.ErrClientClosed, tp.t, tp.p) + } + } return } fetches.EachError(func(t string, p int32, err error) { - if kgo.IsRetryableBrokerErr(err) { - s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d error", t, p), err) - } else { - s.kopts.Logger.Fatal(ctx, fmt.Sprintf("[kgo] fetch topic %s partition %d error", t, p), err) + tps := tp{t, p} + s.mu.Lock() + c := s.consumers[tps] + s.mu.Unlock() + if c != nil { + c.recs <- newErrorFetchTopicPartition(kgo.ErrClientClosed, t, p) } }) @@ -138,7 +151,7 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) s.mu.Lock() pc, ok := s.consumers[tps] s.mu.Unlock() - if !ok { + if !ok || pc == nil { continue } s.mu.Lock() @@ -154,11 +167,21 @@ func (s *Subscriber) killConsumers(ctx context.Context, lost map[string][]int32) } } -func (s *Subscriber) autocommit(_ *kgo.Client, _ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { +func (s *Subscriber) autocommit(_ *kgo.Client, r *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { if err != nil { - // s.connected.Store(0) - if s.fatalOnError { - s.kopts.Logger.Fatal(context.TODO(), "kgo.AutoCommitCallback error", err) + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + + for _, tc := range r.Topics { + for _, c := range s.consumers { + if c != nil { + for _, p := range tc.Partitions { + c.recs <- newErrorFetchTopicPartition(err, tc.Topic, p.Partition) + } + } + } } } } @@ -168,7 +191,6 @@ func (s *Subscriber) lost(ctx context.Context, _ *kgo.Client, lost map[string][] s.kopts.Logger.Error(ctx, fmt.Sprintf("[kgo] lost %#+v", lost)) } s.killConsumers(ctx, lost) - // s.connected.Store(0) } func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[string][]int32) { @@ -178,7 +200,6 @@ func (s *Subscriber) revoked(ctx context.Context, c *kgo.Client, revoked map[str s.killConsumers(ctx, revoked) if err := c.CommitMarkedOffsets(ctx); err != nil { s.kopts.Logger.Error(ctx, "[kgo] revoked CommitMarkedOffsets error", err) - // s.connected.Store(0) } } @@ -223,6 +244,27 @@ func (pc *consumer) consume() { case <-pc.quit: return case p := <-pc.recs: + if p.Err != nil || p.FetchPartition.Err != nil { + + if p.Err != nil { + pm = pc.newErrorMessage(p.Err, p.Topic, p.Partition) + } else if p.FetchPartition.Err != nil { + pm = pc.newErrorMessage(p.FetchPartition.Err, p.Topic, p.Partition) + } + + switch h := pc.handler.(type) { + case func(broker.Message) error: + _ = h(pm) + case func([]broker.Message) error: + _ = h([]broker.Message{pm}) + } + if pc.messagePool { + messagePool.Put(pm) + } + + return + } + for _, record := range p.Records { ctx, sp := pc.htracer.WithProcessSpan(record) ts := time.Now() @@ -233,19 +275,23 @@ func (pc *consumer) consume() { } else { pm = &kgoMessage{} } + pm.body = record.Value pm.topic = record.Topic pm.ack = false pm.hdr = metadata.New(len(record.Headers)) pm.ctx = ctx + for _, hdr := range record.Headers { pm.hdr.Set(hdr.Key, string(hdr.Value)) } + pm.hdr.Set("Micro-Offset", strconv.FormatInt(record.Offset, 10)) pm.hdr.Set("Micro-Partition", strconv.FormatInt(int64(record.Partition), 10)) pm.hdr.Set("Micro-Topic", record.Topic) pm.hdr.Set("Micro-Key", string(record.Key)) pm.hdr.Set("Micro-Timestamp", strconv.FormatInt(record.Timestamp.Unix(), 10)) + switch h := pc.handler.(type) { case func(broker.Message) error: err = h(pm) @@ -269,22 +315,146 @@ func (pc *consumer) consume() { ack := pm.ack if pc.messagePool { - messagePool.Put(p) - } - if ack { - pc.c.MarkCommitRecords(record) - } else { - if sp != nil { - sp.Finish() - } - // pc.connected.Store(0) - pc.kopts.Logger.Fatal(pc.kopts.Context, "[kgo] message not commited") - return + messagePool.Put(pm) } + if sp != nil { sp.Finish() } + + if ack { + pc.c.MarkCommitRecords(record) + continue + } + + pm := pc.newErrorMessage(ErrLostMessage, p.Topic, p.Partition) + switch h := pc.handler.(type) { + case func(broker.Message) error: + _ = h(pm) + case func([]broker.Message) error: + _ = h([]broker.Message{pm}) + } + if pc.messagePool { + messagePool.Put(pm) + } + return } } } } + +func (pc *consumer) newErrorMessage(err error, t string, p int32) *kgoMessage { + var pm *kgoMessage + + if pc.messagePool { + pm = messagePool.Get().(*kgoMessage) + } else { + pm = &kgoMessage{} + } + + pm.err = err + pm.topic = t + pm.hdr = metadata.New(2) + pm.ctx = context.Background() + pm.hdr.Set("Micro-Partition", strconv.FormatInt(int64(p), 10)) + pm.hdr.Set("Micro-Topic", t) + + return pm +} + +func newErrorFetchTopicPartition(err error, t string, p int32) kgo.FetchTopicPartition { + return kgo.FetchTopicPartition{ + Topic: t, + FetchPartition: kgo.FetchPartition{ + Partition: p, + Err: err, + }, + } +} + +var ( + _ kgo.HookBrokerConnect = (*Subscriber)(nil) + _ kgo.HookBrokerDisconnect = (*Subscriber)(nil) + _ kgo.HookBrokerRead = (*Subscriber)(nil) + _ kgo.HookBrokerWrite = (*Subscriber)(nil) + _ kgo.HookGroupManageError = (*Subscriber)(nil) + _ kgo.HookProduceRecordUnbuffered = (*Subscriber)(nil) +) + +func (s *Subscriber) OnGroupManageError(err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnBrokerConnect(_ kgo.BrokerMetadata, _ time.Duration, _ net.Conn, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnBrokerDisconnect(_ kgo.BrokerMetadata, _ net.Conn) { +} + +func (s *Subscriber) OnBrokerWrite(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnBrokerRead(_ kgo.BrokerMetadata, _ int16, _ int, _ time.Duration, _ time.Duration, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +} + +func (s *Subscriber) OnProduceRecordUnbuffered(_ *kgo.Record, err error) { + if err == nil { + return + } + s.mu.Lock() + tpc := make(map[tp]*consumer, len(s.consumers)) + maps.Copy(tpc, s.consumers) + s.mu.Unlock() + for tp, c := range tpc { + if c != nil { + c.recs <- newErrorFetchTopicPartition(err, tp.t, tp.p) + } + } +}