diff --git a/events/events.go b/events/events.go index 76194495..1c799fe0 100644 --- a/events/events.go +++ b/events/events.go @@ -14,16 +14,11 @@ var ( ErrEncodingMessage = errors.New("Error encoding message") ) -// Stream is an event streaming interface +// Stream is an events streaming interface type Stream interface { Publish(topic string, msg interface{}, opts ...PublishOption) error - Subscribe(topic string, opts ...SubscribeOption) (<-chan Event, error) -} - -// Store is an event store interface -type Store interface { - Read(topic string, opts ...ReadOption) ([]*Event, error) - Write(event *Event, opts ...WriteOption) error + Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) + String() string } type AckFunc func() error diff --git a/events/stream/memory/memory.go b/events/memory/memory.go similarity index 89% rename from events/stream/memory/memory.go rename to events/memory/memory.go index f83bac7b..fe894cdc 100644 --- a/events/stream/memory/memory.go +++ b/events/memory/memory.go @@ -15,9 +15,9 @@ import ( ) // NewStream returns an initialized memory stream -func NewStream(opts ...Option) (events.Stream, error) { +func NewStream(opts ...events.Option) (events.Stream, error) { // parse the options - var options Options + var options events.Options for _, o := range opts { o(&options) } @@ -25,7 +25,7 @@ func NewStream(opts ...Option) (events.Stream, error) { options.Store = memory.NewStore() } - return &mem{store: options.Store}, nil + return &memoryStream{store: options.Store}, nil } type subscriber struct { @@ -40,14 +40,14 @@ type subscriber struct { ackWait time.Duration } -type mem struct { +type memoryStream struct { store store.Store subs []*subscriber sync.RWMutex } -func (m *mem) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { +func (m *memoryStream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { // validate the topic if len(topic) == 0 { return events.ErrMissingTopic @@ -100,14 +100,14 @@ func (m *mem) Publish(topic string, msg interface{}, opts ...events.PublishOptio return nil } -func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) { +func (m *memoryStream) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) { // validate the topic if len(topic) == 0 { return nil, events.ErrMissingTopic } // parse the options - options := events.SubscribeOptions{ + options := events.ConsumeOptions{ Queue: uuid.New().String(), AutoAck: true, } @@ -150,7 +150,7 @@ func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan ev // lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends // them into the subscribers channel -func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { +func (m *memoryStream) lookupPreviousEvents(sub *subscriber, startTime time.Time) { // lookup all events which match the topic (a blank topic will return all results) recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix()) if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { @@ -173,8 +173,12 @@ func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { } } +func (m *memoryStream) String() string { + return "memory" +} + // handleEvents sends the event to any registered subscribers. -func (m *mem) handleEvent(ev *events.Event) { +func (m *memoryStream) handleEvent(ev *events.Event) { m.RLock() subs := m.subs m.RUnlock() diff --git a/events/stream/memory/memory_test.go b/events/memory/memory_test.go similarity index 100% rename from events/stream/memory/memory_test.go rename to events/memory/memory_test.go diff --git a/events/options.go b/events/options.go index d63ee26f..a42494c8 100644 --- a/events/options.go +++ b/events/options.go @@ -1,6 +1,33 @@ package events -import "time" +import ( + "time" + + "github.com/micro/go-micro/v3/store" +) + +// Options which are used to configure the in-memory stream +type Options struct { + Store store.Store + TTL time.Duration +} + +// Option is a function which configures options +type Option func(o *Options) + +// WithStore sets the underlying store to use +func WithStore(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} + +// WithTTL sets the default TTL +func StreamTTL(ttl time.Duration) Option { + return func(o *Options) { + o.TTL = ttl + } +} // PublishOptions contains all the options which can be provided when publishing an event type PublishOptions struct { @@ -27,8 +54,8 @@ func WithTimestamp(t time.Time) PublishOption { } } -// SubscribeOptions contains all the options which can be provided when subscribing to a topic -type SubscribeOptions struct { +// ConsumeOptions contains all the options which can be provided when subscribing to a topic +type ConsumeOptions struct { // Queue is the name of the subscribers queue, if two subscribers have the same queue the message // should only be published to one of them Queue string @@ -48,42 +75,42 @@ type SubscribeOptions struct { CustomRetries bool } -// SubscribeOption sets attributes on SubscribeOptions -type SubscribeOption func(o *SubscribeOptions) +// ConsumeOption sets attributes on ConsumeOptions +type ConsumeOption func(o *ConsumeOptions) -// WithQueue sets the Queue fielf on SubscribeOptions to the value provided -func WithQueue(q string) SubscribeOption { - return func(o *SubscribeOptions) { +// WithQueue sets the Queue fielf on ConsumeOptions to the value provided +func WithQueue(q string) ConsumeOption { + return func(o *ConsumeOptions) { o.Queue = q } } -// WithStartAtTime sets the StartAtTime field on SubscribeOptions to the value provided -func WithStartAtTime(t time.Time) SubscribeOption { - return func(o *SubscribeOptions) { +// WithStartAtTime sets the StartAtTime field on ConsumeOptions to the value provided +func WithStartAtTime(t time.Time) ConsumeOption { + return func(o *ConsumeOptions) { o.StartAtTime = t } } -// WithAutoAck sets the AutoAck field on SubscribeOptions and an ackWait duration after which if no ack is received +// WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received // the message is requeued in case auto ack is turned off -func WithAutoAck(ack bool, ackWait time.Duration) SubscribeOption { - return func(o *SubscribeOptions) { +func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption { + return func(o *ConsumeOptions) { o.AutoAck = ack o.AckWait = ackWait } } -// WithRetryLimit sets the RetryLimit field on SubscribeOptions. +// WithRetryLimit sets the RetryLimit field on ConsumeOptions. // Set to -1 for infinite retries (default) -func WithRetryLimit(retries int) SubscribeOption { - return func(o *SubscribeOptions) { +func WithRetryLimit(retries int) ConsumeOption { + return func(o *ConsumeOptions) { o.RetryLimit = retries o.CustomRetries = true } } -func (s SubscribeOptions) GetRetryLimit() int { +func (s ConsumeOptions) GetRetryLimit() int { if !s.CustomRetries { return -1 } diff --git a/events/store/options.go b/events/store/options.go deleted file mode 100644 index c766ee6c..00000000 --- a/events/store/options.go +++ /dev/null @@ -1,28 +0,0 @@ -package store - -import ( - "time" - - "github.com/micro/go-micro/v3/store" -) - -type Options struct { - Store store.Store - TTL time.Duration -} - -type Option func(o *Options) - -// WithStore sets the underlying store to use -func WithStore(s store.Store) Option { - return func(o *Options) { - o.Store = s - } -} - -// WithTTL sets the default TTL -func WithTTL(ttl time.Duration) Option { - return func(o *Options) { - o.TTL = ttl - } -} diff --git a/events/store/store.go b/events/store/store.go deleted file mode 100644 index acacf85e..00000000 --- a/events/store/store.go +++ /dev/null @@ -1,103 +0,0 @@ -package store - -import ( - "encoding/json" - "time" - - "github.com/micro/go-micro/v3/events" - gostore "github.com/micro/go-micro/v3/store" - "github.com/micro/go-micro/v3/store/memory" - "github.com/pkg/errors" -) - -const joinKey = "/" - -// NewStore returns an initialized events store -func NewStore(opts ...Option) events.Store { - // parse the options - var options Options - for _, o := range opts { - o(&options) - } - if options.TTL.Seconds() == 0 { - options.TTL = time.Hour * 24 - } - if options.Store == nil { - options.Store = memory.NewStore() - } - - // return the store - return &evStore{options} -} - -type evStore struct { - opts Options -} - -// Read events for a topic -func (s *evStore) Read(topic string, opts ...events.ReadOption) ([]*events.Event, error) { - // validate the topic - if len(topic) == 0 { - return nil, events.ErrMissingTopic - } - - // parse the options - options := events.ReadOptions{ - Offset: 0, - Limit: 250, - } - for _, o := range opts { - o(&options) - } - - // execute the request - recs, err := s.opts.Store.Read(topic+joinKey, - gostore.ReadPrefix(), - gostore.ReadLimit(options.Limit), - gostore.ReadOffset(options.Offset), - ) - if err != nil { - return nil, errors.Wrap(err, "Error reading from store") - } - - // unmarshal the result - result := make([]*events.Event, len(recs)) - for i, r := range recs { - var e events.Event - if err := json.Unmarshal(r.Value, &e); err != nil { - return nil, errors.Wrap(err, "Invalid event returned from stroe") - } - result[i] = &e - } - - return result, nil -} - -// Write an event to the store -func (s *evStore) Write(event *events.Event, opts ...events.WriteOption) error { - // parse the options - options := events.WriteOptions{ - TTL: s.opts.TTL, - } - for _, o := range opts { - o(&options) - } - - // construct the store record - bytes, err := json.Marshal(event) - if err != nil { - return errors.Wrap(err, "Error mashaling event to JSON") - } - record := &gostore.Record{ - Key: event.Topic + joinKey + event.ID, - Value: bytes, - Expiry: options.TTL, - } - - // write the record to the store - if err := s.opts.Store.Write(record); err != nil { - return errors.Wrap(err, "Error writing to the store") - } - - return nil -} diff --git a/events/store/store_test.go b/events/store/store_test.go deleted file mode 100644 index b2b3d320..00000000 --- a/events/store/store_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package store - -import ( - "testing" - - "github.com/google/uuid" - "github.com/micro/go-micro/v3/events" - "github.com/stretchr/testify/assert" -) - -func TestStore(t *testing.T) { - store := NewStore() - - testData := []events.Event{ - {ID: uuid.New().String(), Topic: "foo"}, - {ID: uuid.New().String(), Topic: "foo"}, - {ID: uuid.New().String(), Topic: "bar"}, - } - - // write the records to the store - t.Run("Write", func(t *testing.T) { - for _, event := range testData { - err := store.Write(&event) - assert.Nilf(t, err, "Writing an event should not return an error") - } - }) - - // should not be able to read events from a blank topic - t.Run("ReadMissingTopic", func(t *testing.T) { - evs, err := store.Read("") - assert.Equal(t, err, events.ErrMissingTopic, "Reading a blank topic should return an error") - assert.Nil(t, evs, "No events should be returned") - }) - - // should only get the events from the topic requested - t.Run("ReadTopic", func(t *testing.T) { - evs, err := store.Read("foo") - assert.Nilf(t, err, "No error should be returned") - assert.Len(t, evs, 2, "Only the events for this topic should be returned") - }) - - // limits should be honoured - t.Run("ReadTopicLimit", func(t *testing.T) { - evs, err := store.Read("foo", events.ReadLimit(1)) - assert.Nilf(t, err, "No error should be returned") - assert.Len(t, evs, 1, "The result should include no more than the read limit") - }) -} diff --git a/events/stream/memory/options.go b/events/stream/memory/options.go deleted file mode 100644 index 8f9f1cad..00000000 --- a/events/stream/memory/options.go +++ /dev/null @@ -1,18 +0,0 @@ -package memory - -import "github.com/micro/go-micro/v3/store" - -// Options which are used to configure the in-memory stream -type Options struct { - Store store.Store -} - -// Option is a function which configures options -type Option func(o *Options) - -// Store sets the store to use -func Store(s store.Store) Option { - return func(o *Options) { - o.Store = s - } -} diff --git a/events/stream/test/stream_test.go b/events/test/stream_test.go similarity index 100% rename from events/stream/test/stream_test.go rename to events/test/stream_test.go