broker/memory: simplify code

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-09-29 13:10:11 +03:00
parent 542f36cfa5
commit 3bc046e5d4

View File

@ -172,13 +172,18 @@ func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...Pub
} }
for _, sub := range subs { for _, sub := range subs {
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
switch {
// batch processing // batch processing
if sub.batchhandler != nil { case sub.batchhandler != nil:
if err = sub.batchhandler(ms); err != nil { if err = sub.batchhandler(ms); err != nil {
ms.SetError(err) ms.SetError(err)
if sub.opts.BatchErrorHandler != nil {
beh = sub.opts.BatchErrorHandler
}
if beh != nil { if beh != nil {
_ = beh(ms) _ = beh(ms)
} else if m.opts.Logger.V(logger.ErrorLevel) { } else if m.opts.Logger.V(logger.ErrorLevel) {
@ -189,15 +194,11 @@ func (m *memoryBroker) publish(ctx context.Context, vs []msgWrapper, opts ...Pub
m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err) m.opts.Logger.Errorf(m.opts.Context, "ack failed: %v", err)
} }
} }
} // single processing
// single processing case sub.handler != nil:
if sub.handler != nil {
for _, p := range ms { for _, p := range ms {
if err = sub.handler(p); err != nil { if err = sub.handler(p); err != nil {
p.SetError(err) p.SetError(err)
if sub.opts.ErrorHandler != nil {
eh = sub.opts.ErrorHandler
}
if eh != nil { if eh != nil {
_ = eh(p) _ = eh(p)
} else if m.opts.Logger.V(logger.ErrorLevel) { } else if m.opts.Logger.V(logger.ErrorLevel) {