From 6bdf33c4eedec8684d989eb48d8b527f420fec0a Mon Sep 17 00:00:00 2001 From: Dominic Wong Date: Wed, 2 Sep 2020 13:28:54 +0100 Subject: [PATCH] Event stream updates (#1981) - auto and manual acking - retry limits --- .github/workflows/tests.yml | 9 +- events/events.go | 24 ++++ events/options.go | 36 +++++ events/stream/memory/memory.go | 91 +++++++++++-- events/stream/memory/memory_test.go | 134 ------------------ events/stream/nats/nats.go | 34 ++++- events/stream/nats/nats_test.go | 151 +-------------------- events/stream/test/stream_test.go | 203 ++++++++++++++++++++++++++++ 8 files changed, 383 insertions(+), 299 deletions(-) create mode 100644 events/stream/test/stream_test.go diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6268cf0b..53458695 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -7,7 +7,6 @@ jobs: name: Test repo runs-on: ubuntu-latest steps: - - name: Set up Go 1.13 uses: actions/setup-go@v1 with: @@ -34,9 +33,13 @@ jobs: kubectl apply -f runtime/kubernetes/test/test.yaml sudo mkdir -p /var/run/secrets/kubernetes.io/serviceaccount sudo chmod 777 /var/run/secrets/kubernetes.io/serviceaccount - wget -qO- https://binaries.cockroachdb.com/cockroach-v20.1.4.linux-amd64.tgz | tar xvz + wget -qO- https://binaries.cockroachdb.com/cockroach-v20.1.4.linux-amd64.tgz | tar -xvz cockroach-v20.1.4.linux-amd64/cockroach start-single-node --insecure & - go test -tags kubernetes -v ./... + wget -q https://github.com/nats-io/nats-streaming-server/releases/download/v0.18.0/nats-streaming-server-v0.18.0-linux-amd64.zip + unzip ./nats-streaming-server-v0.18.0-linux-amd64.zip + export PATH=$PATH:./nats-streaming-server-v0.18.0-linux-amd64 + nats-streaming-server & + go test -tags kubernetes,nats -v ./... - name: Notify of test failure if: failure() diff --git a/events/events.go b/events/events.go index f64e264b..76194495 100644 --- a/events/events.go +++ b/events/events.go @@ -26,6 +26,9 @@ type Store interface { Write(event *Event, opts ...WriteOption) error } +type AckFunc func() error +type NackFunc func() error + // Event is the object returned by the broker when you subscribe to a topic type Event struct { // ID to uniquely identify the event @@ -38,9 +41,30 @@ type Event struct { Metadata map[string]string // Payload contains the encoded message Payload []byte + + ackFunc AckFunc + nackFunc NackFunc } // Unmarshal the events message into an object func (e *Event) Unmarshal(v interface{}) error { return json.Unmarshal(e.Payload, v) } + +// Ack acknowledges successful processing of the event in ManualAck mode +func (e *Event) Ack() error { + return e.ackFunc() +} + +func (e *Event) SetAckFunc(f AckFunc) { + e.ackFunc = f +} + +// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode +func (e *Event) Nack() error { + return e.nackFunc() +} + +func (e *Event) SetNackFunc(f NackFunc) { + e.nackFunc = f +} diff --git a/events/options.go b/events/options.go index bba3b487..d63ee26f 100644 --- a/events/options.go +++ b/events/options.go @@ -35,6 +35,17 @@ type SubscribeOptions struct { // StartAtTime is the time from which the messages should be consumed from. If not provided then // the messages will be consumed starting from the moment the Subscription starts. StartAtTime time.Time + // AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered. + // If false specifies that each message need ts to be manually acknowledged by the subscriber. + // If processing is successful the message should be ack'ed to remove the message from the stream. + // If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will + // remain on the stream to be processed again. + AutoAck bool + AckWait time.Duration + // RetryLimit indicates number of times a message is retried + RetryLimit int + // CustomRetries indicates whether to use RetryLimit + CustomRetries bool } // SubscribeOption sets attributes on SubscribeOptions @@ -54,6 +65,31 @@ func WithStartAtTime(t time.Time) SubscribeOption { } } +// WithAutoAck sets the AutoAck field on SubscribeOptions and an ackWait duration after which if no ack is received +// the message is requeued in case auto ack is turned off +func WithAutoAck(ack bool, ackWait time.Duration) SubscribeOption { + return func(o *SubscribeOptions) { + o.AutoAck = ack + o.AckWait = ackWait + } +} + +// WithRetryLimit sets the RetryLimit field on SubscribeOptions. +// Set to -1 for infinite retries (default) +func WithRetryLimit(retries int) SubscribeOption { + return func(o *SubscribeOptions) { + o.RetryLimit = retries + o.CustomRetries = true + } +} + +func (s SubscribeOptions) GetRetryLimit() int { + if !s.CustomRetries { + return -1 + } + return s.RetryLimit +} + // WriteOptions contains all the options which can be provided when writing an event to a store type WriteOptions struct { // TTL is the duration the event should be recorded for, a zero value TTL indicates the event should diff --git a/events/stream/memory/memory.go b/events/stream/memory/memory.go index 1d06c2bb..83bffe93 100644 --- a/events/stream/memory/memory.go +++ b/events/stream/memory/memory.go @@ -32,6 +32,13 @@ type subscriber struct { Queue string Topic string Channel chan events.Event + + sync.RWMutex + retryMap map[string]int + retryLimit int + trackRetries bool + autoAck bool + ackWait time.Duration } type mem struct { @@ -102,17 +109,31 @@ func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan ev // parse the options options := events.SubscribeOptions{ - Queue: uuid.New().String(), + Queue: uuid.New().String(), + AutoAck: true, } for _, o := range opts { o(&options) } + // TODO RetryLimit // setup the subscriber sub := &subscriber{ - Channel: make(chan events.Event), - Topic: topic, - Queue: options.Queue, + Channel: make(chan events.Event), + Topic: topic, + Queue: options.Queue, + retryMap: map[string]int{}, + autoAck: true, + } + + if options.CustomRetries { + sub.trackRetries = true + sub.retryLimit = options.GetRetryLimit() + + } + if !options.AutoAck { + sub.autoAck = options.AutoAck + sub.ackWait = options.AckWait } // register the subscriber @@ -129,7 +150,7 @@ func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan ev return sub.Channel, nil } -// lookupPreviousEvents finds events for a subscriber which occured before a given time and sends +// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends // them into the subscribers channel func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { // lookup all events which match the topic (a blank topic will return all results) @@ -150,8 +171,7 @@ func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { if ev.Timestamp.Unix() < startTime.Unix() { continue } - - sub.Channel <- ev + sendEvent(&ev, sub) } } @@ -174,8 +194,59 @@ func (m *mem) handleEvent(ev *events.Event) { // send the message to each channel async (since one channel might be blocked) for _, sub := range subs { - go func(s *subscriber) { - s.Channel <- *ev - }(sub) + sendEvent(ev, sub) + } +} + +func sendEvent(ev *events.Event, sub *subscriber) { + go func(s *subscriber) { + evCopy := *ev + if s.autoAck { + s.Channel <- evCopy + return + } + evCopy.SetAckFunc(ackFunc(s, evCopy)) + evCopy.SetNackFunc(nackFunc(s, evCopy)) + s.retryMap[evCopy.ID] = 0 + tick := time.NewTicker(s.ackWait) + defer tick.Stop() + for range tick.C { + s.Lock() + count, ok := s.retryMap[evCopy.ID] + s.Unlock() + if !ok { + // success + break + } + + if s.trackRetries && count > s.retryLimit { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit) + } + s.Lock() + delete(s.retryMap, evCopy.ID) + s.Unlock() + return + } + s.Channel <- evCopy + s.Lock() + s.retryMap[evCopy.ID] = count + 1 + s.Unlock() + } + }(sub) +} + +func ackFunc(s *subscriber, evCopy events.Event) func() error { + return func() error { + s.Lock() + delete(s.retryMap, evCopy.ID) + s.Unlock() + return nil + } +} + +func nackFunc(s *subscriber, evCopy events.Event) func() error { + return func() error { + return nil } } diff --git a/events/stream/memory/memory_test.go b/events/stream/memory/memory_test.go index 93b0bbec..05af74b9 100644 --- a/events/stream/memory/memory_test.go +++ b/events/stream/memory/memory_test.go @@ -1,135 +1 @@ package memory - -import ( - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/micro/go-micro/v3/events" - "github.com/stretchr/testify/assert" -) - -type testPayload struct { - Message string -} - -func TestStream(t *testing.T) { - stream, err := NewStream() - assert.Nilf(t, err, "NewStream should not return an error") - assert.NotNilf(t, stream, "NewStream should return a stream object") - - // TestMissingTopic will test the topic validation on publish - t.Run("TestMissingTopic", func(t *testing.T) { - err := stream.Publish("", nil) - assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error") - }) - - // TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the - // same test topic. - t.Run("TestSubscribeTopic", func(t *testing.T) { - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the subscriber - evChan, err := stream.Subscribe("test") - assert.Nilf(t, err, "Subscribe should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - err = stream.Publish("test", payload, events.WithMetadata(metadata)) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(1) - - // wait for the subscriber to recieve the message or timeout - wg.Wait() - }) - - // TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume - // the message from the firehose topic with different queues. The second subscriber will be registered - // after the message is published to test durability. - t.Run("TestSubscribeQueue", func(t *testing.T) { - topic := uuid.New().String() - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the first subscriber - evChan1, err := stream.Subscribe(topic) - assert.Nilf(t, err, "Subscribe should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan1: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - err = stream.Publish(topic, payload, events.WithMetadata(metadata)) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(2) - - // create the second subscriber - evChan2, err := stream.Subscribe(topic, - events.WithQueue("second_queue"), - events.WithStartAtTime(time.Now().Add(time.Minute*-1)), - ) - assert.Nilf(t, err, "Subscribe should not return an error") - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan2: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - // wait for the subscriber to recieve the message or timeout - wg.Wait() - }) -} diff --git a/events/stream/nats/nats.go b/events/stream/nats/nats.go index 9e2e8365..6646ed02 100644 --- a/events/stream/nats/nats.go +++ b/events/stream/nats/nats.go @@ -116,7 +116,8 @@ func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan // parse the options options := events.SubscribeOptions{ - Queue: uuid.New().String(), + Queue: uuid.New().String(), + AutoAck: true, } for _, o := range opts { o(&options) @@ -125,19 +126,43 @@ func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan // setup the subscriber c := make(chan events.Event) handleMsg := func(m *stan.Msg) { + // poison message handling + if options.GetRetryLimit() > -1 && m.Redelivered && int(m.RedeliveryCount) > options.GetRetryLimit() { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Message retry limit reached, discarding: %v", m.Sequence) + } + m.Ack() // ignoring error + return + } + // decode the message var evt events.Event if err := json.Unmarshal(m.Data, &evt); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Error decoding message: %v", err) } - // not ackknowledging the message is the way to indicate an error occured + // not acknowledging the message is the way to indicate an error occurred return } + if !options.AutoAck { + // set up the ack funcs + evt.SetAckFunc(func() error { + return m.Ack() + }) + evt.SetNackFunc(func() error { + // noop. not acknowledging the message is the way to indicate an error occurred + // we have to wait for the ack wait to kick in before the message is resent + return nil + }) + } + // push onto the channel and wait for the consumer to take the event off before we acknowledge it. c <- evt + if !options.AutoAck { + return + } if err := m.Ack(); err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Error acknowledging message: %v", err) } @@ -149,7 +174,10 @@ func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan stan.SetManualAckMode(), } if options.StartAtTime.Unix() > 0 { - stan.StartAtTime(options.StartAtTime) + subOpts = append(subOpts, stan.StartAtTime(options.StartAtTime)) + } + if options.AckWait > 0 { + subOpts = append(subOpts, stan.AckWait(options.AckWait)) } // connect the subscriber diff --git a/events/stream/nats/nats_test.go b/events/stream/nats/nats_test.go index 69fa0482..f079b586 100644 --- a/events/stream/nats/nats_test.go +++ b/events/stream/nats/nats_test.go @@ -1,150 +1,3 @@ +// +build nats + package nats - -import ( - "net" - "os/exec" - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/micro/go-micro/v3/events" - "github.com/stretchr/testify/assert" -) - -type testPayload struct { - Message string -} - -func TestStream(t *testing.T) { - _, err := exec.LookPath("nats-streaming-server") - if err != nil { - t.Skipf("Skipping nats test, nats-streaming-server binary is not detected") - } - - conn, err := net.DialTimeout("tcp", ":4222", time.Millisecond*100) - if err != nil { - t.Skipf("Skipping nats test, could not connect to cluster on port 4222: %v", err) - } - if err := conn.Close(); err != nil { - t.Fatalf("Error closing test tcp connection to nats cluster") - } - - stream, err := NewStream(ClusterID("test-cluster")) - assert.Nilf(t, err, "NewStream should not return an error") - assert.NotNilf(t, stream, "NewStream should return a stream object") - - // TestMissingTopic will test the topic validation on publish - t.Run("TestMissingTopic", func(t *testing.T) { - err := stream.Publish("", nil) - assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error") - }) - - // TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the - // same test topic. - t.Run("TestSubscribeTopic", func(t *testing.T) { - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the subscriber - evChan, err := stream.Subscribe("test") - assert.Nilf(t, err, "Subscribe should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - err = stream.Publish("test", payload, events.WithMetadata(metadata)) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(1) - - // wait for the subscriber to recieve the message or timeout - wg.Wait() - }) - - // TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume - // the message from the firehose topic with different queues. The second subscriber will be registered - // after the message is published to test durability. - t.Run("TestSubscribeQueue", func(t *testing.T) { - topic := uuid.New().String() - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the first subscriber - evChan1, err := stream.Subscribe(topic) - assert.Nilf(t, err, "Subscribe should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan1: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - err = stream.Publish(topic, payload, events.WithMetadata(metadata)) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(2) - - // create the second subscriber - evChan2, err := stream.Subscribe(topic, - events.WithQueue("second_queue"), - events.WithStartAtTime(time.Now().Add(time.Minute*-1)), - ) - assert.Nilf(t, err, "Subscribe should not return an error") - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan2: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - // wait for the subscriber to recieve the message or timeout - wg.Wait() - }) -} diff --git a/events/stream/test/stream_test.go b/events/stream/test/stream_test.go new file mode 100644 index 00000000..497074b4 --- /dev/null +++ b/events/stream/test/stream_test.go @@ -0,0 +1,203 @@ +// +build nats + +package test + +import ( + "sync" + "testing" + "time" + + "github.com/micro/go-micro/v3/events" + "github.com/micro/go-micro/v3/events/stream/memory" + "github.com/micro/go-micro/v3/events/stream/nats" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +type testPayload struct { + Message string +} + +type testCase struct { + str events.Stream + name string +} + +func TestStream(t *testing.T) { + tcs := []testCase{} + + // NATS specific setup + stream, err := nats.NewStream(nats.ClusterID("test-cluster")) + assert.Nilf(t, err, "NewStream should not return an error") + assert.NotNilf(t, stream, "NewStream should return a stream object") + tcs = append(tcs, testCase{str: stream, name: "nats"}) + + stream, err = memory.NewStream() + assert.Nilf(t, err, "NewStream should not return an error") + assert.NotNilf(t, stream, "NewStream should return a stream object") + tcs = append(tcs, testCase{str: stream, name: "memory"}) + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + runTestStream(t, tc.str) + }) + } + +} + +func runTestStream(t *testing.T, stream events.Stream) { + // TestMissingTopic will test the topic validation on publish + t.Run("TestMissingTopic", func(t *testing.T) { + err := stream.Publish("", nil) + assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error") + }) + + // TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the + // same test topic. + t.Run("TestSubscribeTopic", func(t *testing.T) { + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the subscriber + evChan, err := stream.Subscribe("test") + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish("test", payload, events.WithMetadata(metadata)) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(1) + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + // TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume + // the message from the firehose topic with different queues. The second subscriber will be registered + // after the message is published to test durability. + t.Run("TestSubscribeQueue", func(t *testing.T) { + topic := uuid.New().String() + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the first subscriber + evChan1, err := stream.Subscribe(topic) + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan1: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish(topic, payload, events.WithMetadata(metadata)) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(2) + + // create the second subscriber + evChan2, err := stream.Subscribe(topic, + events.WithQueue("second_queue"), + events.WithStartAtTime(time.Now().Add(time.Minute*-1)), + ) + assert.Nilf(t, err, "Subscribe should not return an error") + + go func() { + timeout := time.NewTimer(time.Second * 1) + + select { + case event, _ := <-evChan2: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + t.Run("AckingNacking", func(t *testing.T) { + ch, err := stream.Subscribe("foobarAck", events.WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"})) + assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"})) + + ev := <-ch + ev.Ack() + ev = <-ch + nacked := ev.ID + ev.Nack() + select { + case ev = <-ch: + assert.Equal(t, ev.ID, nacked, "Nacked message should have been received again") + assert.NoError(t, ev.Ack()) + case <-time.After(7 * time.Second): + t.Fatalf("Timed out waiting for message to be put back on queue") + } + + }) + + t.Run("Retries", func(t *testing.T) { + ch, err := stream.Subscribe("foobarRetries", events.WithAutoAck(false, 5*time.Second), events.WithRetryLimit(1)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"})) + + ev := <-ch + id := ev.ID + ev.Nack() + ev = <-ch + assert.Equal(t, id, ev.ID, "Nacked message should have been received again") + ev.Nack() + select { + case ev = <-ch: + t.Fatalf("Unexpected event received") + case <-time.After(7 * time.Second): + } + + }) +}