broker error handler support and interface changes (#501)

* broker: update to micro broker changes

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-03-07 13:32:35 +03:00
parent 68595827a1
commit b6770e34bd
3 changed files with 14 additions and 4 deletions

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.13
require (
github.com/google/uuid v1.1.1
github.com/micro/go-micro/v2 v2.1.2
github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d
github.com/nats-io/nats-streaming-server v0.16.2 // indirect
github.com/nats-io/stan.go v0.6.0
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect

3
go.sum
View File

@ -283,6 +283,8 @@ github.com/micro/cli/v2 v2.1.2 h1:43J1lChg/rZCC1rvdqZNFSQDrGT7qfMrtp6/ztpIkEM=
github.com/micro/cli/v2 v2.1.2/go.mod h1:EguNh6DAoWKm9nmk+k/Rg0H3lQnDxqzu5x5srOtGtYg=
github.com/micro/go-micro/v2 v2.1.2 h1:r2pu9OckjG+vHD1Ttpwbsj9UnYAHnEiYa3ND1ejL6Do=
github.com/micro/go-micro/v2 v2.1.2/go.mod h1:6RewFTFMI5H5CbuQymu4eS0cFtqYsdGFruMflWT36IQ=
github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d h1:Rz+SOJiYuj4mWO3Z1qj2iTt5njWFjxR8Jv7g0LkDwVc=
github.com/micro/go-micro/v2 v2.2.1-0.20200306212516-8ee56072549d/go.mod h1:JxIKgdCqe9hhdUOAyd2uWaCpRdSn9dWq8wnwlo8qodk=
github.com/micro/mdns v0.3.0 h1:bYycYe+98AXR3s8Nq5qvt6C573uFTDPIYzJemWON0QE=
github.com/micro/mdns v0.3.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@ -354,6 +356,7 @@ github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgF
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-buffruneio v0.2.0/go.mod h1:JkE26KsDizTr40EUHkXVtNPvgGtbSNq5BcowyYOWdKo=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

13
stan.go
View File

@ -43,6 +43,7 @@ type publication struct {
t string
msg *stan.Msg
m *broker.Message
err error
}
func init() {
@ -61,6 +62,10 @@ func (n *publication) Ack() error {
return n.msg.Ack()
}
func (n *publication) Error() error {
return n.err
}
func (n *subscriber) Options() broker.SubscribeOptions {
return n.opts
}
@ -337,16 +342,18 @@ func (n *stanBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
fn := func(msg *stan.Msg) {
var m broker.Message
p := &publication{m: &m, msg: msg, t: msg.Subject}
// unmarshal message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
p.err = err
p.m.Body = msg.Data
return
}
// execute the handler
err := handler(&publication{m: &m, msg: msg, t: msg.Subject})
p.err = handler(p)
// if there's no error and success auto ack is enabled ack it
if err == nil && ackSuccess {
if p.err == nil && ackSuccess {
msg.Ack()
}
}