pass errors from broker to subscribers
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
15
kgo.go
15
kgo.go
@@ -112,6 +112,7 @@ type kgoMessage struct {
|
||||
hdr metadata.Metadata
|
||||
opts broker.MessageOptions
|
||||
ack bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (m *kgoMessage) Ack() error {
|
||||
@@ -135,6 +136,10 @@ func (m *kgoMessage) Topic() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *kgoMessage) Error() error {
|
||||
return m.err
|
||||
}
|
||||
|
||||
func (m *kgoMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
|
||||
return m.c.Unmarshal(m.body, dst)
|
||||
}
|
||||
@@ -519,15 +524,6 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
|
||||
|
||||
options := broker.NewSubscribeOptions(opts...)
|
||||
|
||||
switch handler.(type) {
|
||||
default:
|
||||
return nil, broker.ErrInvalidHandler
|
||||
case func(broker.Message) error:
|
||||
break
|
||||
case func([]broker.Message) error:
|
||||
break
|
||||
}
|
||||
|
||||
if options.Group == "" {
|
||||
uid, err := id.New()
|
||||
if err != nil {
|
||||
@@ -584,6 +580,7 @@ func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interfac
|
||||
kgo.OnPartitionsLost(sub.lost),
|
||||
kgo.AutoCommitCallback(sub.autocommit),
|
||||
kgo.AutoCommitMarks(),
|
||||
kgo.WithHooks(sub),
|
||||
)
|
||||
|
||||
if options.Context != nil {
|
||||
|
||||
Reference in New Issue
Block a user