events: update interface (#1954)
This commit is contained in:
@@ -16,7 +16,6 @@ import (
|
||||
|
||||
const (
|
||||
defaultClusterID = "micro"
|
||||
eventsTopic = "events"
|
||||
)
|
||||
|
||||
// NewStream returns an initialized nats stream or an error if the connection to the nats
|
||||
@@ -59,7 +58,7 @@ type stream struct {
|
||||
}
|
||||
|
||||
// Publish a message to a topic
|
||||
func (s *stream) Publish(topic string, opts ...events.PublishOption) error {
|
||||
func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
|
||||
// validate the topic
|
||||
if len(topic) == 0 {
|
||||
return events.ErrMissingTopic
|
||||
@@ -75,10 +74,10 @@ func (s *stream) Publish(topic string, opts ...events.PublishOption) error {
|
||||
|
||||
// encode the message if it's not already encoded
|
||||
var payload []byte
|
||||
if p, ok := options.Payload.([]byte); ok {
|
||||
if p, ok := msg.([]byte); ok {
|
||||
payload = p
|
||||
} else {
|
||||
p, err := json.Marshal(options.Payload)
|
||||
p, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return events.ErrEncodingMessage
|
||||
}
|
||||
@@ -100,11 +99,6 @@ func (s *stream) Publish(topic string, opts ...events.PublishOption) error {
|
||||
return errors.Wrap(err, "Error encoding event")
|
||||
}
|
||||
|
||||
// publish the event to the events channel
|
||||
if _, err := s.conn.PublishAsync(eventsTopic, bytes, nil); err != nil {
|
||||
return errors.Wrap(err, "Error publishing message to events")
|
||||
}
|
||||
|
||||
// publish the event to the topic's channel
|
||||
if _, err := s.conn.PublishAsync(event.Topic, bytes, nil); err != nil {
|
||||
return errors.Wrap(err, "Error publishing message to topic")
|
||||
@@ -114,10 +108,14 @@ func (s *stream) Publish(topic string, opts ...events.PublishOption) error {
|
||||
}
|
||||
|
||||
// Subscribe to a topic
|
||||
func (s *stream) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, error) {
|
||||
func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) {
|
||||
// validate the topic
|
||||
if len(topic) == 0 {
|
||||
return nil, events.ErrMissingTopic
|
||||
}
|
||||
|
||||
// parse the options
|
||||
options := events.SubscribeOptions{
|
||||
Topic: eventsTopic,
|
||||
Queue: uuid.New().String(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
@@ -147,7 +145,7 @@ func (s *stream) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event,
|
||||
|
||||
// setup the options
|
||||
subOpts := []stan.SubscriptionOption{
|
||||
stan.DurableName(options.Topic),
|
||||
stan.DurableName(topic),
|
||||
stan.SetManualAckMode(),
|
||||
}
|
||||
if options.StartAtTime.Unix() > 0 {
|
||||
@@ -155,7 +153,7 @@ func (s *stream) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event,
|
||||
}
|
||||
|
||||
// connect the subscriber
|
||||
_, err := s.conn.QueueSubscribe(options.Topic, options.Queue, handleMsg, subOpts...)
|
||||
_, err := s.conn.QueueSubscribe(topic, options.Queue, handleMsg, subOpts...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error subscribing to topic")
|
||||
}
|
||||
|
@@ -36,53 +36,10 @@ func TestStream(t *testing.T) {
|
||||
|
||||
// TestMissingTopic will test the topic validation on publish
|
||||
t.Run("TestMissingTopic", func(t *testing.T) {
|
||||
err := stream.Publish("")
|
||||
err := stream.Publish("", nil)
|
||||
assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error")
|
||||
})
|
||||
|
||||
// TestFirehose will publish a message to the test topic. The subscriber will subscribe to the
|
||||
// firehose topic (indicated by a lack of the topic option).
|
||||
t.Run("TestFirehose", func(t *testing.T) {
|
||||
payload := &testPayload{Message: "HelloWorld"}
|
||||
metadata := map[string]string{"foo": "bar"}
|
||||
|
||||
// create the subscriber
|
||||
evChan, err := stream.Subscribe()
|
||||
assert.Nilf(t, err, "Subscribe should not return an error")
|
||||
|
||||
// setup the subscriber async
|
||||
var wg sync.WaitGroup
|
||||
|
||||
go func() {
|
||||
timeout := time.NewTimer(time.Millisecond * 250)
|
||||
|
||||
select {
|
||||
case event, _ := <-evChan:
|
||||
assert.NotNilf(t, event, "The message was nil")
|
||||
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
|
||||
|
||||
var result testPayload
|
||||
err = event.Unmarshal(&result)
|
||||
assert.Nil(t, err, "Error decoding result")
|
||||
assert.Equal(t, result, *payload, "Payload didn't match")
|
||||
|
||||
wg.Done()
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Event was not recieved")
|
||||
}
|
||||
}()
|
||||
|
||||
err = stream.Publish("test",
|
||||
events.WithPayload(payload),
|
||||
events.WithMetadata(metadata),
|
||||
)
|
||||
assert.Nil(t, err, "Publishing a valid message should not return an error")
|
||||
wg.Add(1)
|
||||
|
||||
// wait for the subscriber to recieve the message or timeout
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
// TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the
|
||||
// same test topic.
|
||||
t.Run("TestSubscribeTopic", func(t *testing.T) {
|
||||
@@ -90,7 +47,7 @@ func TestStream(t *testing.T) {
|
||||
metadata := map[string]string{"foo": "bar"}
|
||||
|
||||
// create the subscriber
|
||||
evChan, err := stream.Subscribe(events.WithTopic("test"))
|
||||
evChan, err := stream.Subscribe("test")
|
||||
assert.Nilf(t, err, "Subscribe should not return an error")
|
||||
|
||||
// setup the subscriber async
|
||||
@@ -115,10 +72,7 @@ func TestStream(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err = stream.Publish("test",
|
||||
events.WithPayload(payload),
|
||||
events.WithMetadata(metadata),
|
||||
)
|
||||
err = stream.Publish("test", payload, events.WithMetadata(metadata))
|
||||
assert.Nil(t, err, "Publishing a valid message should not return an error")
|
||||
wg.Add(1)
|
||||
|
||||
@@ -135,7 +89,7 @@ func TestStream(t *testing.T) {
|
||||
metadata := map[string]string{"foo": "bar"}
|
||||
|
||||
// create the first subscriber
|
||||
evChan1, err := stream.Subscribe(events.WithTopic(topic))
|
||||
evChan1, err := stream.Subscribe(topic)
|
||||
assert.Nilf(t, err, "Subscribe should not return an error")
|
||||
|
||||
// setup the subscriber async
|
||||
@@ -160,16 +114,12 @@ func TestStream(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err = stream.Publish(topic,
|
||||
events.WithPayload(payload),
|
||||
events.WithMetadata(metadata),
|
||||
)
|
||||
err = stream.Publish(topic, payload, events.WithMetadata(metadata))
|
||||
assert.Nil(t, err, "Publishing a valid message should not return an error")
|
||||
wg.Add(2)
|
||||
|
||||
// create the second subscriber
|
||||
evChan2, err := stream.Subscribe(
|
||||
events.WithTopic(topic),
|
||||
evChan2, err := stream.Subscribe(topic,
|
||||
events.WithQueue("second_queue"),
|
||||
events.WithStartAtTime(time.Now().Add(time.Minute*-1)),
|
||||
)
|
||||
|
Reference in New Issue
Block a user