diff --git a/broker/broker.go b/broker/broker.go index 97845f12..b1ce35bd 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -28,6 +28,7 @@ type Event interface { Topic() string Message() *Message Ack() error + Error() error } // Subscriber is a convenience return type for the Subscribe method diff --git a/broker/default.go b/broker/default.go index b8b8ffe1..05eef916 100644 --- a/broker/default.go +++ b/broker/default.go @@ -53,8 +53,9 @@ type subscriber struct { } type publication struct { - t string - m *Message + t string + err error + m *Message } func (p *publication) Topic() string { @@ -70,6 +71,10 @@ func (p *publication) Ack() error { return nil } +func (p *publication) Error() error { + return p.err +} + func (s *subscriber) Options() SubscribeOptions { return s.opts } @@ -390,10 +395,26 @@ func (n *natsBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO fn := func(msg *nats.Msg) { var m Message - if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { + pub := &publication{t: msg.Subject} + eh := n.opts.ErrorHandler + err := n.opts.Codec.Unmarshal(msg.Data, &m) + pub.err = err + pub.m = &m + if err != nil { + m.Body = msg.Data + log.Error(err) + if eh != nil { + eh(pub) + } return } - handler(&publication{m: &m, t: msg.Subject}) + if err := handler(pub); err != nil { + pub.err = err + log.Error(err) + if eh != nil { + eh(pub) + } + } } var sub *nats.Subscription diff --git a/broker/memory/memory.go b/broker/memory/memory.go index 1f9bea86..2a449467 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -27,6 +27,7 @@ type memoryBroker struct { type memoryEvent struct { opts broker.Options topic string + err error message interface{} } @@ -120,6 +121,11 @@ func (m *memoryBroker) Publish(topic string, msg *broker.Message, opts ...broker for _, sub := range subs { if err := sub.handler(p); err != nil { + p.err = err + if eh := m.opts.ErrorHandler; eh != nil { + eh(p) + continue + } return err } } @@ -197,6 +203,10 @@ func (m *memoryEvent) Ack() error { return nil } +func (m *memoryEvent) Error() error { + return m.err +} + func (m *memorySubscriber) Options() broker.SubscribeOptions { return m.opts } diff --git a/broker/nats/nats.go b/broker/nats/nats.go index 89bfd1e7..1af597eb 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -50,8 +50,9 @@ type subscriber struct { } type publication struct { - t string - m *broker.Message + t string + err error + m *broker.Message } func (p *publication) Topic() string { @@ -67,6 +68,10 @@ func (p *publication) Ack() error { return nil } +func (p *publication) Error() error { + return p.err +} + func (s *subscriber) Options() broker.SubscribeOptions { return s.opts } @@ -375,10 +380,26 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro fn := func(msg *nats.Msg) { var m broker.Message - if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { + pub := &publication{t: msg.Subject} + eh := n.opts.ErrorHandler + err := n.opts.Codec.Unmarshal(msg.Data, &m) + pub.err = err + pub.m = &m + if err != nil { + m.Body = msg.Data + log.Error(err) + if eh != nil { + eh(pub) + } return } - handler(&publication{m: &m, t: msg.Subject}) + if err := handler(pub); err != nil { + pub.err = err + log.Error(err) + if eh != nil { + eh(pub) + } + } } var sub *nats.Subscription diff --git a/broker/options.go b/broker/options.go index 3fdc4dbc..b3317d24 100644 --- a/broker/options.go +++ b/broker/options.go @@ -9,9 +9,14 @@ import ( ) type Options struct { - Addrs []string - Secure bool - Codec codec.Marshaler + Addrs []string + Secure bool + Codec codec.Marshaler + + // Handler executed when error happens in broker mesage + // processing + ErrorHandler Handler + TLSConfig *tls.Config // Registry used for clustering Registry registry.Registry @@ -81,6 +86,14 @@ func DisableAutoAck() SubscribeOption { } } +// ErrorHandler will catch all broker errors that cant be handled +// in normal way, for example Codec errors +func ErrorHandler(h Handler) Option { + return func(o *Options) { + o.ErrorHandler = h + } +} + // Queue sets the name of the queue to share messages on func Queue(name string) SubscribeOption { return func(o *SubscribeOptions) { diff --git a/broker/service/subscriber.go b/broker/service/subscriber.go index 2e92ad43..662d6d50 100644 --- a/broker/service/subscriber.go +++ b/broker/service/subscriber.go @@ -17,6 +17,7 @@ type serviceSub struct { type serviceEvent struct { topic string + err error message *broker.Message } @@ -32,6 +33,10 @@ func (s *serviceEvent) Ack() error { return nil } +func (s *serviceEvent) Error() error { + return s.err +} + func (s *serviceSub) isClosed() bool { select { case <-s.closed: @@ -71,14 +76,14 @@ func (s *serviceSub) run() error { return err } - // TODO: handle error - s.handler(&serviceEvent{ + p := &serviceEvent{ topic: s.topic, message: &broker.Message{ Header: msg.Header, Body: msg.Body, }, - }) + } + p.err = s.handler(p) } } diff --git a/server/rpc_event.go b/server/rpc_event.go index fe2a946a..f45f6c87 100644 --- a/server/rpc_event.go +++ b/server/rpc_event.go @@ -7,6 +7,7 @@ import ( // event is a broker event we handle on the server transport type event struct { + err error message *broker.Message } @@ -19,6 +20,10 @@ func (e *event) Message() *broker.Message { return e.message } +func (e *event) Error() error { + return e.err +} + func (e *event) Topic() string { return e.message.Header["Micro-Topic"] } diff --git a/tunnel/broker/broker.go b/tunnel/broker/broker.go index 1b904e95..259115dd 100644 --- a/tunnel/broker/broker.go +++ b/tunnel/broker/broker.go @@ -163,6 +163,10 @@ func (t *tunEvent) Ack() error { return nil } +func (t *tunEvent) Error() error { + return nil +} + func NewBroker(opts ...broker.Option) broker.Broker { options := broker.Options{ Context: context.Background(),