Compare commits

...

2 Commits

Author SHA1 Message Date
fe5d474f36 fixup error
Some checks failed
coverage / build (push) Has been cancelled
test / test (push) Has been cancelled
sync / sync (push) Successful in 11s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-02 21:46:03 +03:00
d000ac6843 pass errors from broker to subscribers
Some checks are pending
coverage / build (push) Waiting to run
test / test (push) Waiting to run
sync / sync (push) Has started running
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-12-02 21:35:35 +03:00

View File

@@ -123,7 +123,7 @@ func (s *Subscriber) poll(ctx context.Context) {
c := s.consumers[tps] c := s.consumers[tps]
s.mu.Unlock() s.mu.Unlock()
if c != nil { if c != nil {
c.recs <- newErrorFetchTopicPartition(kgo.ErrClientClosed, t, p) c.recs <- newErrorFetchTopicPartition(err, t, p)
} }
}) })
@@ -352,6 +352,8 @@ func (pc *consumer) newErrorMessage(err error, t string, p int32) *kgoMessage {
pm = &kgoMessage{} pm = &kgoMessage{}
} }
pm.ack = false
pm.body = nil
pm.err = err pm.err = err
pm.topic = t pm.topic = t
pm.hdr = metadata.New(2) pm.hdr = metadata.New(2)