events: add store implementation (#1957)
This commit is contained in:
parent
8738ed7757
commit
329bc2f265
@ -22,7 +22,7 @@ type Stream interface {
|
|||||||
|
|
||||||
// Store of events
|
// Store of events
|
||||||
type Store interface {
|
type Store interface {
|
||||||
Read(opts ...ReadOption) ([]*Event, error)
|
Read(topic string, opts ...ReadOption) ([]*Event, error)
|
||||||
Write(event *Event, opts ...WriteOption) error
|
Write(event *Event, opts ...WriteOption) error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,7 +34,7 @@ type Event struct {
|
|||||||
Topic string
|
Topic string
|
||||||
// Timestamp of the event
|
// Timestamp of the event
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
// Metadata contains the encoded event was indexed by
|
// Metadata contains the values the event was indexed by
|
||||||
Metadata map[string]string
|
Metadata map[string]string
|
||||||
// Payload contains the encoded message
|
// Payload contains the encoded message
|
||||||
Payload []byte
|
Payload []byte
|
||||||
|
@ -73,47 +73,24 @@ func WithTTL(d time.Duration) WriteOption {
|
|||||||
|
|
||||||
// ReadOptions contains all the options which can be provided when reading events from a store
|
// ReadOptions contains all the options which can be provided when reading events from a store
|
||||||
type ReadOptions struct {
|
type ReadOptions struct {
|
||||||
// Topic to read events from, if no topic is provided events from all topics will be returned
|
|
||||||
Topic string
|
|
||||||
// Query to filter the results using. The store will query the metadata provided when the event
|
|
||||||
// was written to the store
|
|
||||||
Query map[string]string
|
|
||||||
// Limit the number of results to return
|
// Limit the number of results to return
|
||||||
Limit int
|
Limit uint
|
||||||
// Offset the results by this number, useful for paginated queries
|
// Offset the results by this number, useful for paginated queries
|
||||||
Offset int
|
Offset uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadOption sets attributes on ReadOptions
|
// ReadOption sets attributes on ReadOptions
|
||||||
type ReadOption func(o *ReadOptions)
|
type ReadOption func(o *ReadOptions)
|
||||||
|
|
||||||
// ReadTopic sets the topic attribute on ReadOptions
|
|
||||||
func ReadTopic(t string) ReadOption {
|
|
||||||
return func(o *ReadOptions) {
|
|
||||||
o.Topic = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadFilter sets a key and value in the query
|
|
||||||
func ReadFilter(key, value string) ReadOption {
|
|
||||||
return func(o *ReadOptions) {
|
|
||||||
if o.Query == nil {
|
|
||||||
o.Query = map[string]string{key: value}
|
|
||||||
} else {
|
|
||||||
o.Query[key] = value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadLimit sets the limit attribute on ReadOptions
|
// ReadLimit sets the limit attribute on ReadOptions
|
||||||
func ReadLimit(l int) ReadOption {
|
func ReadLimit(l uint) ReadOption {
|
||||||
return func(o *ReadOptions) {
|
return func(o *ReadOptions) {
|
||||||
o.Limit = 1
|
o.Limit = 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadOffset sets the offset attribute on ReadOptions
|
// ReadOffset sets the offset attribute on ReadOptions
|
||||||
func ReadOffset(l int) ReadOption {
|
func ReadOffset(l uint) ReadOption {
|
||||||
return func(o *ReadOptions) {
|
return func(o *ReadOptions) {
|
||||||
o.Offset = 1
|
o.Offset = 1
|
||||||
}
|
}
|
||||||
|
28
events/store/options.go
Normal file
28
events/store/options.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v3/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
Store store.Store
|
||||||
|
TTL time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type Option func(o *Options)
|
||||||
|
|
||||||
|
// WithStore sets the underlying store to use
|
||||||
|
func WithStore(s store.Store) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Store = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithTTL sets the default TTL
|
||||||
|
func WithTTL(ttl time.Duration) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.TTL = ttl
|
||||||
|
}
|
||||||
|
}
|
103
events/store/store.go
Normal file
103
events/store/store.go
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/v3/events"
|
||||||
|
gostore "github.com/micro/go-micro/v3/store"
|
||||||
|
"github.com/micro/go-micro/v3/store/memory"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const joinKey = "/"
|
||||||
|
|
||||||
|
// NewStore returns an initialized events store
|
||||||
|
func NewStore(opts ...Option) events.Store {
|
||||||
|
// parse the options
|
||||||
|
var options Options
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
if options.TTL.Seconds() == 0 {
|
||||||
|
options.TTL = time.Hour * 24
|
||||||
|
}
|
||||||
|
if options.Store == nil {
|
||||||
|
options.Store = memory.NewStore()
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the store
|
||||||
|
return &evStore{options}
|
||||||
|
}
|
||||||
|
|
||||||
|
type evStore struct {
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read events for a topic
|
||||||
|
func (s *evStore) Read(topic string, opts ...events.ReadOption) ([]*events.Event, error) {
|
||||||
|
// validate the topic
|
||||||
|
if len(topic) == 0 {
|
||||||
|
return nil, events.ErrMissingTopic
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse the options
|
||||||
|
options := events.ReadOptions{
|
||||||
|
Offset: 0,
|
||||||
|
Limit: 250,
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute the request
|
||||||
|
recs, err := s.opts.Store.Read(topic+joinKey,
|
||||||
|
gostore.ReadPrefix(),
|
||||||
|
gostore.ReadLimit(options.Limit),
|
||||||
|
gostore.ReadOffset(options.Offset),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "Error reading from store")
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshal the result
|
||||||
|
result := make([]*events.Event, len(recs))
|
||||||
|
for i, r := range recs {
|
||||||
|
var e events.Event
|
||||||
|
if err := json.Unmarshal(r.Value, &e); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "Invalid event returned from stroe")
|
||||||
|
}
|
||||||
|
result[i] = &e
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write an event to the store
|
||||||
|
func (s *evStore) Write(event *events.Event, opts ...events.WriteOption) error {
|
||||||
|
// parse the options
|
||||||
|
options := events.WriteOptions{
|
||||||
|
TTL: s.opts.TTL,
|
||||||
|
}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
|
||||||
|
// construct the store record
|
||||||
|
bytes, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "Error mashaling event to JSON")
|
||||||
|
}
|
||||||
|
record := &gostore.Record{
|
||||||
|
Key: event.Topic + joinKey + event.ID,
|
||||||
|
Value: bytes,
|
||||||
|
Expiry: options.TTL,
|
||||||
|
}
|
||||||
|
|
||||||
|
// write the record to the store
|
||||||
|
if err := s.opts.Store.Write(record); err != nil {
|
||||||
|
return errors.Wrap(err, "Error writing to the store")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
48
events/store/store_test.go
Normal file
48
events/store/store_test.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/micro/go-micro/v3/events"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStore(t *testing.T) {
|
||||||
|
store := NewStore()
|
||||||
|
|
||||||
|
testData := []events.Event{
|
||||||
|
{ID: uuid.New().String(), Topic: "foo"},
|
||||||
|
{ID: uuid.New().String(), Topic: "foo"},
|
||||||
|
{ID: uuid.New().String(), Topic: "bar"},
|
||||||
|
}
|
||||||
|
|
||||||
|
// write the records to the store
|
||||||
|
t.Run("Write", func(t *testing.T) {
|
||||||
|
for _, event := range testData {
|
||||||
|
err := store.Write(&event)
|
||||||
|
assert.Nilf(t, err, "Writing an event should not return an error")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// should not be able to read events from a blank topic
|
||||||
|
t.Run("ReadMissingTopic", func(t *testing.T) {
|
||||||
|
evs, err := store.Read("")
|
||||||
|
assert.Equal(t, err, events.ErrMissingTopic, "Reading a blank topic should return an error")
|
||||||
|
assert.Nil(t, evs, "No events should be returned")
|
||||||
|
})
|
||||||
|
|
||||||
|
// should only get the events from the topic requested
|
||||||
|
t.Run("ReadTopic", func(t *testing.T) {
|
||||||
|
evs, err := store.Read("foo")
|
||||||
|
assert.Nilf(t, err, "No error should be returned")
|
||||||
|
assert.Len(t, evs, 2, "Only the events for this topic should be returned")
|
||||||
|
})
|
||||||
|
|
||||||
|
// limits should be honoured
|
||||||
|
t.Run("ReadTopicLimit", func(t *testing.T) {
|
||||||
|
evs, err := store.Read("foo", events.ReadLimit(1))
|
||||||
|
assert.Nilf(t, err, "No error should be returned")
|
||||||
|
assert.Len(t, evs, 1, "The result should include no more than the read limit")
|
||||||
|
})
|
||||||
|
}
|
@ -123,35 +123,19 @@ func (m *memoryStore) delete(prefix, key string) {
|
|||||||
|
|
||||||
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
||||||
allItems := m.store.Items()
|
allItems := m.store.Items()
|
||||||
allKeys := make([]string, len(allItems))
|
keys := make([]string, len(allItems))
|
||||||
i := 0
|
i := 0
|
||||||
|
|
||||||
for k := range allItems {
|
for k := range allItems {
|
||||||
if !strings.HasPrefix(k, prefix+"/") {
|
if !strings.HasPrefix(k, prefix+"/") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
allKeys[i] = strings.TrimPrefix(k, prefix+"/")
|
keys[i] = strings.TrimPrefix(k, prefix+"/")
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
if limit != 0 || offset != 0 {
|
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
|
||||||
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
|
return applyLimitAndOffset(keys, limit, offset)
|
||||||
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
|
|
||||||
end := len(allKeys)
|
|
||||||
if limit > 0 {
|
|
||||||
calcLimit := int(offset + limit)
|
|
||||||
if calcLimit < end {
|
|
||||||
end = calcLimit
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if int(offset) >= end {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return allKeys[offset:end]
|
|
||||||
}
|
|
||||||
|
|
||||||
return allKeys
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Close() error {
|
func (m *memoryStore) Close() error {
|
||||||
@ -179,12 +163,10 @@ func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Recor
|
|||||||
prefix := m.prefix(readOpts.Database, readOpts.Table)
|
prefix := m.prefix(readOpts.Database, readOpts.Table)
|
||||||
|
|
||||||
var keys []string
|
var keys []string
|
||||||
|
|
||||||
// Handle Prefix / suffix
|
// Handle Prefix / suffix
|
||||||
if readOpts.Prefix || readOpts.Suffix {
|
if readOpts.Prefix || readOpts.Suffix {
|
||||||
k := m.list(prefix, readOpts.Limit, readOpts.Offset)
|
// apply limit / offset once filtering is complete
|
||||||
|
for _, kk := range m.list(prefix, 0, 0) {
|
||||||
for _, kk := range k {
|
|
||||||
if readOpts.Prefix && !strings.HasPrefix(kk, key) {
|
if readOpts.Prefix && !strings.HasPrefix(kk, key) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -195,6 +177,8 @@ func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Recor
|
|||||||
|
|
||||||
keys = append(keys, kk)
|
keys = append(keys, kk)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
keys = applyLimitAndOffset(keys, readOpts.Limit, readOpts.Offset)
|
||||||
} else {
|
} else {
|
||||||
keys = []string{key}
|
keys = []string{key}
|
||||||
}
|
}
|
||||||
@ -297,3 +281,23 @@ func (m *memoryStore) List(opts ...store.ListOption) ([]string, error) {
|
|||||||
|
|
||||||
return keys, nil
|
return keys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func applyLimitAndOffset(keys []string, limit, offset uint) []string {
|
||||||
|
if limit == 0 && offset == 0 {
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
|
end := len(keys)
|
||||||
|
if limit > 0 {
|
||||||
|
calcLimit := int(offset + limit)
|
||||||
|
if calcLimit < end {
|
||||||
|
end = calcLimit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if int(offset) >= end {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return keys[offset:end]
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user