From 724e2b5830828fb7a58375fd33a5d4e323223ae2 Mon Sep 17 00:00:00 2001 From: Dominic Wong Date: Fri, 4 Sep 2020 08:31:49 +0100 Subject: [PATCH] Memory events stream not pushing publications correctly (#1984) * subs not filtered correctly on publish. simplify retry logic * check fo invalid ackwait --- events/stream/memory/memory.go | 32 ++++++++++---------- events/stream/test/stream_test.go | 50 +++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 17 deletions(-) diff --git a/events/stream/memory/memory.go b/events/stream/memory/memory.go index 83bffe93..f83bac7b 100644 --- a/events/stream/memory/memory.go +++ b/events/stream/memory/memory.go @@ -34,11 +34,10 @@ type subscriber struct { Channel chan events.Event sync.RWMutex - retryMap map[string]int - retryLimit int - trackRetries bool - autoAck bool - ackWait time.Duration + retryMap map[string]int + retryLimit int + autoAck bool + ackWait time.Duration } type mem struct { @@ -119,19 +118,18 @@ func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan ev // setup the subscriber sub := &subscriber{ - Channel: make(chan events.Event), - Topic: topic, - Queue: options.Queue, - retryMap: map[string]int{}, - autoAck: true, + Channel: make(chan events.Event), + Topic: topic, + Queue: options.Queue, + retryMap: map[string]int{}, + autoAck: true, + retryLimit: options.GetRetryLimit(), } - if options.CustomRetries { - sub.trackRetries = true - sub.retryLimit = options.GetRetryLimit() - - } if !options.AutoAck { + if options.AckWait == 0 { + return nil, fmt.Errorf("invalid AckWait passed, should be positive integer") + } sub.autoAck = options.AutoAck sub.ackWait = options.AckWait } @@ -193,7 +191,7 @@ func (m *mem) handleEvent(ev *events.Event) { } // send the message to each channel async (since one channel might be blocked) - for _, sub := range subs { + for _, sub := range filteredSubs { sendEvent(ev, sub) } } @@ -219,7 +217,7 @@ func sendEvent(ev *events.Event, sub *subscriber) { break } - if s.trackRetries && count > s.retryLimit { + if s.retryLimit > -1 && count > s.retryLimit { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit) } diff --git a/events/stream/test/stream_test.go b/events/stream/test/stream_test.go index 497074b4..0186ccc4 100644 --- a/events/stream/test/stream_test.go +++ b/events/stream/test/stream_test.go @@ -200,4 +200,54 @@ func runTestStream(t *testing.T, stream events.Stream) { } }) + + t.Run("InfiniteRetries", func(t *testing.T) { + ch, err := stream.Subscribe("foobarRetriesInf", events.WithAutoAck(false, 2*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"})) + + count := 0 + id := "" + for { + select { + case ev := <-ch: + if id != "" { + assert.Equal(t, id, ev.ID, "Nacked message should have been received again") + } + id = ev.ID + case <-time.After(3 * time.Second): + t.Fatalf("Unexpected event received") + } + + count++ + if count == 11 { + break + } + } + + }) + + t.Run("twoSubs", func(t *testing.T) { + ch1, err := stream.Subscribe("foobarTwoSubs1", events.WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing to topic 1") + ch2, err := stream.Subscribe("foobarTwoSubs2", events.WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing to topic 2") + + assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"})) + assert.NoError(t, stream.Publish("foobarTwoSubs1", map[string]string{"foo": "message 1"})) + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + ev := <-ch1 + assert.Equal(t, "foobarTwoSubs1", ev.Topic, "Received message from unexpected topic") + wg.Done() + }() + go func() { + ev := <-ch2 + assert.Equal(t, "foobarTwoSubs2", ev.Topic, "Received message from unexpected topic") + wg.Done() + }() + wg.Wait() + }) }