diff --git a/broker/http_broker.go b/broker/http_broker.go index 53dd48f1..41d80f5d 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -533,8 +533,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) s, err := h.r.GetService(serviceName) if err != nil { h.RUnlock() - // ignore error - return nil + return err } h.RUnlock() diff --git a/sync/task/broker/broker.go b/sync/task/broker/broker.go index f643fb2d..45c20b3e 100644 --- a/sync/task/broker/broker.go +++ b/sync/task/broker/broker.go @@ -99,14 +99,22 @@ func (t *Task) Run(c task.Command) error { // subscribe for the pool size for i := 0; i < t.Options.Pool; i++ { - // subscribe to work - subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i))) + err := func() error { + // subscribe to work + subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i))) + if err != nil { + return err + } + + // unsubscribe on completion + defer subWork.Unsubscribe() + + return nil + }() + if err != nil { return err } - - // unsubscribe on completion - defer subWork.Unsubscribe() } // subscribe to all status messages