subscribe to consume

This commit is contained in:
Asim Aslam
2020-10-17 10:33:30 +01:00
parent 1b8ccd13a6
commit d7295810da
2 changed files with 15 additions and 19 deletions

View File

@@ -46,15 +46,15 @@ func runTestStream(t *testing.T, stream events.Stream) {
assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error")
})
// TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the
// TestConsumeTopic will publish a message to the test topic. The subscriber will subscribe to the
// same test topic.
t.Run("TestSubscribeTopic", func(t *testing.T) {
t.Run("TestConsumeTopic", func(t *testing.T) {
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the subscriber
evChan, err := stream.Subscribe("test")
assert.Nilf(t, err, "Subscribe should not return an error")
evChan, err := stream.Consume("test")
assert.Nilf(t, err, "Consume should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
@@ -86,17 +86,17 @@ func runTestStream(t *testing.T, stream events.Stream) {
wg.Wait()
})
// TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume
// TestConsumeQueue will publish a message to a random topic. Two subscribers will then consume
// the message from the firehose topic with different queues. The second subscriber will be registered
// after the message is published to test durability.
t.Run("TestSubscribeQueue", func(t *testing.T) {
t.Run("TestConsumeQueue", func(t *testing.T) {
topic := uuid.New().String()
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the first subscriber
evChan1, err := stream.Subscribe(topic)
assert.Nilf(t, err, "Subscribe should not return an error")
evChan1, err := stream.Consume(topic)
assert.Nilf(t, err, "Consume should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
@@ -125,11 +125,11 @@ func runTestStream(t *testing.T, stream events.Stream) {
wg.Add(2)
// create the second subscriber
evChan2, err := stream.Subscribe(topic,
evChan2, err := stream.Consume(topic,
events.WithQueue("second_queue"),
events.WithStartAtTime(time.Now().Add(time.Minute*-1)),
)
assert.Nilf(t, err, "Subscribe should not return an error")
assert.Nilf(t, err, "Consume should not return an error")
go func() {
timeout := time.NewTimer(time.Second * 1)
@@ -155,7 +155,7 @@ func runTestStream(t *testing.T, stream events.Stream) {
})
t.Run("AckingNacking", func(t *testing.T) {
ch, err := stream.Subscribe("foobarAck", events.WithAutoAck(false, 5*time.Second))
ch, err := stream.Consume("foobarAck", events.WithAutoAck(false, 5*time.Second))
assert.NoError(t, err, "Unexpected error subscribing")
assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"}))
assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"}))
@@ -176,7 +176,7 @@ func runTestStream(t *testing.T, stream events.Stream) {
})
t.Run("Retries", func(t *testing.T) {
ch, err := stream.Subscribe("foobarRetries", events.WithAutoAck(false, 5*time.Second), events.WithRetryLimit(1))
ch, err := stream.Consume("foobarRetries", events.WithAutoAck(false, 5*time.Second), events.WithRetryLimit(1))
assert.NoError(t, err, "Unexpected error subscribing")
assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"}))
@@ -195,7 +195,7 @@ func runTestStream(t *testing.T, stream events.Stream) {
})
t.Run("InfiniteRetries", func(t *testing.T) {
ch, err := stream.Subscribe("foobarRetriesInf", events.WithAutoAck(false, 2*time.Second))
ch, err := stream.Consume("foobarRetriesInf", events.WithAutoAck(false, 2*time.Second))
assert.NoError(t, err, "Unexpected error subscribing")
assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"}))
@@ -221,9 +221,9 @@ func runTestStream(t *testing.T, stream events.Stream) {
})
t.Run("twoSubs", func(t *testing.T) {
ch1, err := stream.Subscribe("foobarTwoSubs1", events.WithAutoAck(false, 5*time.Second))
ch1, err := stream.Consume("foobarTwoSubs1", events.WithAutoAck(false, 5*time.Second))
assert.NoError(t, err, "Unexpected error subscribing to topic 1")
ch2, err := stream.Subscribe("foobarTwoSubs2", events.WithAutoAck(false, 5*time.Second))
ch2, err := stream.Consume("foobarTwoSubs2", events.WithAutoAck(false, 5*time.Second))
assert.NoError(t, err, "Unexpected error subscribing to topic 2")
assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"}))

View File

@@ -1,10 +1,6 @@
// Package metrics is an interface for instrumentation.
package metrics
import (
"time"
)
type Fields map[string]string
// Metrics provides a way to instrument application data