diff --git a/events/events.go b/events/events.go index fb55bb3f..91f7134a 100644 --- a/events/events.go +++ b/events/events.go @@ -22,7 +22,7 @@ type Stream interface { // Store of events type Store interface { - Read(opts ...ReadOption) ([]*Event, error) + Read(topic string, opts ...ReadOption) ([]*Event, error) Write(event *Event, opts ...WriteOption) error } @@ -34,7 +34,7 @@ type Event struct { Topic string // Timestamp of the event Timestamp time.Time - // Metadata contains the encoded event was indexed by + // Metadata contains the values the event was indexed by Metadata map[string]string // Payload contains the encoded message Payload []byte diff --git a/events/options.go b/events/options.go index 9c39d91f..bba3b487 100644 --- a/events/options.go +++ b/events/options.go @@ -73,47 +73,24 @@ func WithTTL(d time.Duration) WriteOption { // ReadOptions contains all the options which can be provided when reading events from a store type ReadOptions struct { - // Topic to read events from, if no topic is provided events from all topics will be returned - Topic string - // Query to filter the results using. The store will query the metadata provided when the event - // was written to the store - Query map[string]string // Limit the number of results to return - Limit int + Limit uint // Offset the results by this number, useful for paginated queries - Offset int + Offset uint } // ReadOption sets attributes on ReadOptions type ReadOption func(o *ReadOptions) -// ReadTopic sets the topic attribute on ReadOptions -func ReadTopic(t string) ReadOption { - return func(o *ReadOptions) { - o.Topic = t - } -} - -// ReadFilter sets a key and value in the query -func ReadFilter(key, value string) ReadOption { - return func(o *ReadOptions) { - if o.Query == nil { - o.Query = map[string]string{key: value} - } else { - o.Query[key] = value - } - } -} - // ReadLimit sets the limit attribute on ReadOptions -func ReadLimit(l int) ReadOption { +func ReadLimit(l uint) ReadOption { return func(o *ReadOptions) { o.Limit = 1 } } // ReadOffset sets the offset attribute on ReadOptions -func ReadOffset(l int) ReadOption { +func ReadOffset(l uint) ReadOption { return func(o *ReadOptions) { o.Offset = 1 } diff --git a/events/store/options.go b/events/store/options.go new file mode 100644 index 00000000..c766ee6c --- /dev/null +++ b/events/store/options.go @@ -0,0 +1,28 @@ +package store + +import ( + "time" + + "github.com/micro/go-micro/v3/store" +) + +type Options struct { + Store store.Store + TTL time.Duration +} + +type Option func(o *Options) + +// WithStore sets the underlying store to use +func WithStore(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} + +// WithTTL sets the default TTL +func WithTTL(ttl time.Duration) Option { + return func(o *Options) { + o.TTL = ttl + } +} diff --git a/events/store/store.go b/events/store/store.go new file mode 100644 index 00000000..acacf85e --- /dev/null +++ b/events/store/store.go @@ -0,0 +1,103 @@ +package store + +import ( + "encoding/json" + "time" + + "github.com/micro/go-micro/v3/events" + gostore "github.com/micro/go-micro/v3/store" + "github.com/micro/go-micro/v3/store/memory" + "github.com/pkg/errors" +) + +const joinKey = "/" + +// NewStore returns an initialized events store +func NewStore(opts ...Option) events.Store { + // parse the options + var options Options + for _, o := range opts { + o(&options) + } + if options.TTL.Seconds() == 0 { + options.TTL = time.Hour * 24 + } + if options.Store == nil { + options.Store = memory.NewStore() + } + + // return the store + return &evStore{options} +} + +type evStore struct { + opts Options +} + +// Read events for a topic +func (s *evStore) Read(topic string, opts ...events.ReadOption) ([]*events.Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, events.ErrMissingTopic + } + + // parse the options + options := events.ReadOptions{ + Offset: 0, + Limit: 250, + } + for _, o := range opts { + o(&options) + } + + // execute the request + recs, err := s.opts.Store.Read(topic+joinKey, + gostore.ReadPrefix(), + gostore.ReadLimit(options.Limit), + gostore.ReadOffset(options.Offset), + ) + if err != nil { + return nil, errors.Wrap(err, "Error reading from store") + } + + // unmarshal the result + result := make([]*events.Event, len(recs)) + for i, r := range recs { + var e events.Event + if err := json.Unmarshal(r.Value, &e); err != nil { + return nil, errors.Wrap(err, "Invalid event returned from stroe") + } + result[i] = &e + } + + return result, nil +} + +// Write an event to the store +func (s *evStore) Write(event *events.Event, opts ...events.WriteOption) error { + // parse the options + options := events.WriteOptions{ + TTL: s.opts.TTL, + } + for _, o := range opts { + o(&options) + } + + // construct the store record + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error mashaling event to JSON") + } + record := &gostore.Record{ + Key: event.Topic + joinKey + event.ID, + Value: bytes, + Expiry: options.TTL, + } + + // write the record to the store + if err := s.opts.Store.Write(record); err != nil { + return errors.Wrap(err, "Error writing to the store") + } + + return nil +} diff --git a/events/store/store_test.go b/events/store/store_test.go new file mode 100644 index 00000000..b2b3d320 --- /dev/null +++ b/events/store/store_test.go @@ -0,0 +1,48 @@ +package store + +import ( + "testing" + + "github.com/google/uuid" + "github.com/micro/go-micro/v3/events" + "github.com/stretchr/testify/assert" +) + +func TestStore(t *testing.T) { + store := NewStore() + + testData := []events.Event{ + {ID: uuid.New().String(), Topic: "foo"}, + {ID: uuid.New().String(), Topic: "foo"}, + {ID: uuid.New().String(), Topic: "bar"}, + } + + // write the records to the store + t.Run("Write", func(t *testing.T) { + for _, event := range testData { + err := store.Write(&event) + assert.Nilf(t, err, "Writing an event should not return an error") + } + }) + + // should not be able to read events from a blank topic + t.Run("ReadMissingTopic", func(t *testing.T) { + evs, err := store.Read("") + assert.Equal(t, err, events.ErrMissingTopic, "Reading a blank topic should return an error") + assert.Nil(t, evs, "No events should be returned") + }) + + // should only get the events from the topic requested + t.Run("ReadTopic", func(t *testing.T) { + evs, err := store.Read("foo") + assert.Nilf(t, err, "No error should be returned") + assert.Len(t, evs, 2, "Only the events for this topic should be returned") + }) + + // limits should be honoured + t.Run("ReadTopicLimit", func(t *testing.T) { + evs, err := store.Read("foo", events.ReadLimit(1)) + assert.Nilf(t, err, "No error should be returned") + assert.Len(t, evs, 1, "The result should include no more than the read limit") + }) +} diff --git a/events/memory/memory.go b/events/stream/memory/memory.go similarity index 100% rename from events/memory/memory.go rename to events/stream/memory/memory.go diff --git a/events/memory/memory_test.go b/events/stream/memory/memory_test.go similarity index 100% rename from events/memory/memory_test.go rename to events/stream/memory/memory_test.go diff --git a/events/memory/options.go b/events/stream/memory/options.go similarity index 100% rename from events/memory/options.go rename to events/stream/memory/options.go diff --git a/events/nats/nats.go b/events/stream/nats/nats.go similarity index 100% rename from events/nats/nats.go rename to events/stream/nats/nats.go diff --git a/events/nats/nats_test.go b/events/stream/nats/nats_test.go similarity index 100% rename from events/nats/nats_test.go rename to events/stream/nats/nats_test.go diff --git a/events/nats/options.go b/events/stream/nats/options.go similarity index 100% rename from events/nats/options.go rename to events/stream/nats/options.go diff --git a/store/memory/memory.go b/store/memory/memory.go index cca91815..5ef77b60 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -123,35 +123,19 @@ func (m *memoryStore) delete(prefix, key string) { func (m *memoryStore) list(prefix string, limit, offset uint) []string { allItems := m.store.Items() - allKeys := make([]string, len(allItems)) + keys := make([]string, len(allItems)) i := 0 for k := range allItems { if !strings.HasPrefix(k, prefix+"/") { continue } - allKeys[i] = strings.TrimPrefix(k, prefix+"/") + keys[i] = strings.TrimPrefix(k, prefix+"/") i++ } - if limit != 0 || offset != 0 { - sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) - sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) - end := len(allKeys) - if limit > 0 { - calcLimit := int(offset + limit) - if calcLimit < end { - end = calcLimit - } - } - - if int(offset) >= end { - return nil - } - return allKeys[offset:end] - } - - return allKeys + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + return applyLimitAndOffset(keys, limit, offset) } func (m *memoryStore) Close() error { @@ -179,12 +163,10 @@ func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Recor prefix := m.prefix(readOpts.Database, readOpts.Table) var keys []string - // Handle Prefix / suffix if readOpts.Prefix || readOpts.Suffix { - k := m.list(prefix, readOpts.Limit, readOpts.Offset) - - for _, kk := range k { + // apply limit / offset once filtering is complete + for _, kk := range m.list(prefix, 0, 0) { if readOpts.Prefix && !strings.HasPrefix(kk, key) { continue } @@ -195,6 +177,8 @@ func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Recor keys = append(keys, kk) } + + keys = applyLimitAndOffset(keys, readOpts.Limit, readOpts.Offset) } else { keys = []string{key} } @@ -297,3 +281,23 @@ func (m *memoryStore) List(opts ...store.ListOption) ([]string, error) { return keys, nil } + +func applyLimitAndOffset(keys []string, limit, offset uint) []string { + if limit == 0 && offset == 0 { + return keys + } + + end := len(keys) + if limit > 0 { + calcLimit := int(offset + limit) + if calcLimit < end { + end = calcLimit + } + } + + if int(offset) >= end { + return nil + } + + return keys[offset:end] +}