diff --git a/events/events.go b/events/events.go index c9bf4575..fb55bb3f 100644 --- a/events/events.go +++ b/events/events.go @@ -16,8 +16,8 @@ var ( // Stream of events type Stream interface { - Publish(topic string, opts ...PublishOption) error - Subscribe(opts ...SubscribeOption) (<-chan Event, error) + Publish(topic string, msg interface{}, opts ...PublishOption) error + Subscribe(topic string, opts ...SubscribeOption) (<-chan Event, error) } // Store of events diff --git a/events/memory/memory.go b/events/memory/memory.go index c5c9691a..7b02b4c5 100644 --- a/events/memory/memory.go +++ b/events/memory/memory.go @@ -41,7 +41,7 @@ type mem struct { sync.RWMutex } -func (m *mem) Publish(topic string, opts ...events.PublishOption) error { +func (m *mem) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { // validate the topic if len(topic) == 0 { return events.ErrMissingTopic @@ -57,10 +57,10 @@ func (m *mem) Publish(topic string, opts ...events.PublishOption) error { // encode the message if it's not already encoded var payload []byte - if p, ok := options.Payload.([]byte); ok { + if p, ok := msg.([]byte); ok { payload = p } else { - p, err := json.Marshal(options.Payload) + p, err := json.Marshal(msg) if err != nil { return events.ErrEncodingMessage } @@ -94,7 +94,12 @@ func (m *mem) Publish(topic string, opts ...events.PublishOption) error { return nil } -func (m *mem) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, error) { +func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, events.ErrMissingTopic + } + // parse the options options := events.SubscribeOptions{ Queue: uuid.New().String(), @@ -106,7 +111,7 @@ func (m *mem) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, er // setup the subscriber sub := &subscriber{ Channel: make(chan events.Event), - Topic: options.Topic, + Topic: topic, Queue: options.Queue, } @@ -127,13 +132,8 @@ func (m *mem) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, er // lookupPreviousEvents finds events for a subscriber which occured before a given time and sends // them into the subscribers channel func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { - var prefix string - if len(sub.Topic) > 0 { - prefix = sub.Topic + "/" - } - // lookup all events which match the topic (a blank topic will return all results) - recs, err := m.store.Read(prefix, store.ReadPrefix()) + recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix()) if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Error looking up previous events: %v", err) return diff --git a/events/memory/memory_test.go b/events/memory/memory_test.go index 69e7605a..9a58c6dc 100644 --- a/events/memory/memory_test.go +++ b/events/memory/memory_test.go @@ -21,53 +21,10 @@ func TestStream(t *testing.T) { // TestMissingTopic will test the topic validation on publish t.Run("TestMissingTopic", func(t *testing.T) { - err := stream.Publish("") + err := stream.Publish("", nil) assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error") }) - // TestFirehose will publish a message to the test topic. The subscriber will subscribe to the - // firehose topic (indicated by a lack of the topic option). - t.Run("TestFirehose", func(t *testing.T) { - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the subscriber - evChan, err := stream.Subscribe() - assert.Nilf(t, err, "Subscribe should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - err = stream.Publish("test", - events.WithPayload(payload), - events.WithMetadata(metadata), - ) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(1) - - // wait for the subscriber to recieve the message or timeout - wg.Wait() - }) - // TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the // same test topic. t.Run("TestSubscribeTopic", func(t *testing.T) { @@ -75,7 +32,7 @@ func TestStream(t *testing.T) { metadata := map[string]string{"foo": "bar"} // create the subscriber - evChan, err := stream.Subscribe(events.WithTopic("test")) + evChan, err := stream.Subscribe("test") assert.Nilf(t, err, "Subscribe should not return an error") // setup the subscriber async @@ -100,10 +57,7 @@ func TestStream(t *testing.T) { } }() - err = stream.Publish("test", - events.WithPayload(payload), - events.WithMetadata(metadata), - ) + err = stream.Publish("test", payload, events.WithMetadata(metadata)) assert.Nil(t, err, "Publishing a valid message should not return an error") wg.Add(1) @@ -120,7 +74,7 @@ func TestStream(t *testing.T) { metadata := map[string]string{"foo": "bar"} // create the first subscriber - evChan1, err := stream.Subscribe(events.WithTopic(topic)) + evChan1, err := stream.Subscribe(topic) assert.Nilf(t, err, "Subscribe should not return an error") // setup the subscriber async @@ -145,16 +99,12 @@ func TestStream(t *testing.T) { } }() - err = stream.Publish(topic, - events.WithPayload(payload), - events.WithMetadata(metadata), - ) + err = stream.Publish(topic, payload, events.WithMetadata(metadata)) assert.Nil(t, err, "Publishing a valid message should not return an error") wg.Add(2) // create the second subscriber - evChan2, err := stream.Subscribe( - events.WithTopic(topic), + evChan2, err := stream.Subscribe(topic, events.WithQueue("second_queue"), events.WithStartAtTime(time.Now().Add(time.Minute*-1)), ) diff --git a/events/nats/nats.go b/events/nats/nats.go index aca0b538..2355cca5 100644 --- a/events/nats/nats.go +++ b/events/nats/nats.go @@ -16,7 +16,6 @@ import ( const ( defaultClusterID = "micro" - eventsTopic = "events" ) // NewStream returns an initialized nats stream or an error if the connection to the nats @@ -59,7 +58,7 @@ type stream struct { } // Publish a message to a topic -func (s *stream) Publish(topic string, opts ...events.PublishOption) error { +func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { // validate the topic if len(topic) == 0 { return events.ErrMissingTopic @@ -75,10 +74,10 @@ func (s *stream) Publish(topic string, opts ...events.PublishOption) error { // encode the message if it's not already encoded var payload []byte - if p, ok := options.Payload.([]byte); ok { + if p, ok := msg.([]byte); ok { payload = p } else { - p, err := json.Marshal(options.Payload) + p, err := json.Marshal(msg) if err != nil { return events.ErrEncodingMessage } @@ -100,11 +99,6 @@ func (s *stream) Publish(topic string, opts ...events.PublishOption) error { return errors.Wrap(err, "Error encoding event") } - // publish the event to the events channel - if _, err := s.conn.PublishAsync(eventsTopic, bytes, nil); err != nil { - return errors.Wrap(err, "Error publishing message to events") - } - // publish the event to the topic's channel if _, err := s.conn.PublishAsync(event.Topic, bytes, nil); err != nil { return errors.Wrap(err, "Error publishing message to topic") @@ -114,10 +108,14 @@ func (s *stream) Publish(topic string, opts ...events.PublishOption) error { } // Subscribe to a topic -func (s *stream) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, error) { +func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, events.ErrMissingTopic + } + // parse the options options := events.SubscribeOptions{ - Topic: eventsTopic, Queue: uuid.New().String(), } for _, o := range opts { @@ -147,7 +145,7 @@ func (s *stream) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, // setup the options subOpts := []stan.SubscriptionOption{ - stan.DurableName(options.Topic), + stan.DurableName(topic), stan.SetManualAckMode(), } if options.StartAtTime.Unix() > 0 { @@ -155,7 +153,7 @@ func (s *stream) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, } // connect the subscriber - _, err := s.conn.QueueSubscribe(options.Topic, options.Queue, handleMsg, subOpts...) + _, err := s.conn.QueueSubscribe(topic, options.Queue, handleMsg, subOpts...) if err != nil { return nil, errors.Wrap(err, "Error subscribing to topic") } diff --git a/events/nats/nats_test.go b/events/nats/nats_test.go index 2122a1b2..c1465583 100644 --- a/events/nats/nats_test.go +++ b/events/nats/nats_test.go @@ -36,53 +36,10 @@ func TestStream(t *testing.T) { // TestMissingTopic will test the topic validation on publish t.Run("TestMissingTopic", func(t *testing.T) { - err := stream.Publish("") + err := stream.Publish("", nil) assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error") }) - // TestFirehose will publish a message to the test topic. The subscriber will subscribe to the - // firehose topic (indicated by a lack of the topic option). - t.Run("TestFirehose", func(t *testing.T) { - payload := &testPayload{Message: "HelloWorld"} - metadata := map[string]string{"foo": "bar"} - - // create the subscriber - evChan, err := stream.Subscribe() - assert.Nilf(t, err, "Subscribe should not return an error") - - // setup the subscriber async - var wg sync.WaitGroup - - go func() { - timeout := time.NewTimer(time.Millisecond * 250) - - select { - case event, _ := <-evChan: - assert.NotNilf(t, event, "The message was nil") - assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") - - var result testPayload - err = event.Unmarshal(&result) - assert.Nil(t, err, "Error decoding result") - assert.Equal(t, result, *payload, "Payload didn't match") - - wg.Done() - case <-timeout.C: - t.Fatalf("Event was not recieved") - } - }() - - err = stream.Publish("test", - events.WithPayload(payload), - events.WithMetadata(metadata), - ) - assert.Nil(t, err, "Publishing a valid message should not return an error") - wg.Add(1) - - // wait for the subscriber to recieve the message or timeout - wg.Wait() - }) - // TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the // same test topic. t.Run("TestSubscribeTopic", func(t *testing.T) { @@ -90,7 +47,7 @@ func TestStream(t *testing.T) { metadata := map[string]string{"foo": "bar"} // create the subscriber - evChan, err := stream.Subscribe(events.WithTopic("test")) + evChan, err := stream.Subscribe("test") assert.Nilf(t, err, "Subscribe should not return an error") // setup the subscriber async @@ -115,10 +72,7 @@ func TestStream(t *testing.T) { } }() - err = stream.Publish("test", - events.WithPayload(payload), - events.WithMetadata(metadata), - ) + err = stream.Publish("test", payload, events.WithMetadata(metadata)) assert.Nil(t, err, "Publishing a valid message should not return an error") wg.Add(1) @@ -135,7 +89,7 @@ func TestStream(t *testing.T) { metadata := map[string]string{"foo": "bar"} // create the first subscriber - evChan1, err := stream.Subscribe(events.WithTopic(topic)) + evChan1, err := stream.Subscribe(topic) assert.Nilf(t, err, "Subscribe should not return an error") // setup the subscriber async @@ -160,16 +114,12 @@ func TestStream(t *testing.T) { } }() - err = stream.Publish(topic, - events.WithPayload(payload), - events.WithMetadata(metadata), - ) + err = stream.Publish(topic, payload, events.WithMetadata(metadata)) assert.Nil(t, err, "Publishing a valid message should not return an error") wg.Add(2) // create the second subscriber - evChan2, err := stream.Subscribe( - events.WithTopic(topic), + evChan2, err := stream.Subscribe(topic, events.WithQueue("second_queue"), events.WithStartAtTime(time.Now().Add(time.Minute*-1)), ) diff --git a/events/options.go b/events/options.go index fe3096e8..9c39d91f 100644 --- a/events/options.go +++ b/events/options.go @@ -6,9 +6,6 @@ import "time" type PublishOptions struct { // Metadata contains any keys which can be used to query the data, for example a customer id Metadata map[string]string - // Payload contains any additonal data which is relevent to the event but does not need to be - // indexed such as structured data - Payload interface{} // Timestamp to set for the event, if the timestamp is a zero value, the current time will be used Timestamp time.Time } @@ -23,13 +20,6 @@ func WithMetadata(md map[string]string) PublishOption { } } -// WithPayload sets the payload field on PublishOptions -func WithPayload(p interface{}) PublishOption { - return func(o *PublishOptions) { - o.Payload = p - } -} - // WithTimestamp sets the timestamp field on PublishOptions func WithTimestamp(t time.Time) PublishOption { return func(o *PublishOptions) { @@ -42,9 +32,6 @@ type SubscribeOptions struct { // Queue is the name of the subscribers queue, if two subscribers have the same queue the message // should only be published to one of them Queue string - // Topic to subscribe to, if left blank the consumer will be subscribed to the firehouse topic which - // recieves all events - Topic string // StartAtTime is the time from which the messages should be consumed from. If not provided then // the messages will be consumed starting from the moment the Subscription starts. StartAtTime time.Time @@ -60,13 +47,6 @@ func WithQueue(q string) SubscribeOption { } } -// WithTopic sets the topic to subscribe to -func WithTopic(t string) SubscribeOption { - return func(o *SubscribeOptions) { - o.Topic = t - } -} - // WithStartAtTime sets the StartAtTime field on SubscribeOptions to the value provided func WithStartAtTime(t time.Time) SubscribeOption { return func(o *SubscribeOptions) {