From b6770e34bdf67596597c213c30feda061ebe28f3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 7 Mar 2020 13:32:35 +0300 Subject: [PATCH] broker error handler support and interface changes (#501) * broker: update to micro broker changes Signed-off-by: Vasiliy Tolstov --- go.mod | 2 +- go.sum | 3 +++ stan.go | 13 ++++++++++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 91ea92c..f56c594 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/google/uuid v1.1.1 - github.com/micro/go-micro/v2 v2.1.2 + github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d github.com/nats-io/nats-streaming-server v0.16.2 // indirect github.com/nats-io/stan.go v0.6.0 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect diff --git a/go.sum b/go.sum index 194a57b..7b1bda2 100644 --- a/go.sum +++ b/go.sum @@ -283,6 +283,8 @@ github.com/micro/cli/v2 v2.1.2 h1:43J1lChg/rZCC1rvdqZNFSQDrGT7qfMrtp6/ztpIkEM= github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg= github.com/micro/go-micro/v2 v2.1.2 h1:r2pu9OckjG+vHD1Ttpwbsj9UnYAHnEiYa3ND1ejL6Do= github.com/micro/go-micro/v2 v2.1.2/go.mod h1:6RewFTFMI5H5CbuQymu4eS0cFtqYsdGFruMflWT36IQ= +github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d h1:Rz+SOJiYuj4mWO3Z1qj2iTt5njWFjxR8Jv7g0LkDwVc= +github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d/go.mod h1:JxIKgdCqe9hhdUOAyd2uWaCpRdSn9dWq8wnwlo8qodk= github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE= github.com/micro/mdns v0.3.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc= github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -354,6 +356,7 @@ github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgF github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/stan.go b/stan.go index 0031064..a007528 100644 --- a/stan.go +++ b/stan.go @@ -43,6 +43,7 @@ type publication struct { t string msg *stan.Msg m *broker.Message + err error } func init() { @@ -61,6 +62,10 @@ func (n *publication) Ack() error { return n.msg.Ack() } +func (n *publication) Error() error { + return n.err +} + func (n *subscriber) Options() broker.SubscribeOptions { return n.opts } @@ -337,16 +342,18 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro fn := func(msg *stan.Msg) { var m broker.Message + p := &publication{m: &m, msg: msg, t: msg.Subject} // unmarshal message if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil { + p.err = err + p.m.Body = msg.Data return } - // execute the handler - err := handler(&publication{m: &m, msg: msg, t: msg.Subject}) + p.err = handler(p) // if there's no error and success auto ack is enabled ack it - if err == nil && ackSuccess { + if p.err == nil && ackSuccess { msg.Ack() } }