remove events store

This commit is contained in:
Asim Aslam
2020-10-17 10:04:18 +01:00
parent 1ae2ac9020
commit c7f24aa425
9 changed files with 61 additions and 232 deletions

View File

@@ -14,16 +14,11 @@ var (
ErrEncodingMessage = errors.New("Error encoding message") ErrEncodingMessage = errors.New("Error encoding message")
) )
// Stream is an event streaming interface // Stream is an events streaming interface
type Stream interface { type Stream interface {
Publish(topic string, msg interface{}, opts ...PublishOption) error Publish(topic string, msg interface{}, opts ...PublishOption) error
Subscribe(topic string, opts ...SubscribeOption) (<-chan Event, error) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
} String() string
// Store is an event store interface
type Store interface {
Read(topic string, opts ...ReadOption) ([]*Event, error)
Write(event *Event, opts ...WriteOption) error
} }
type AckFunc func() error type AckFunc func() error

View File

@@ -15,9 +15,9 @@ import (
) )
// NewStream returns an initialized memory stream // NewStream returns an initialized memory stream
func NewStream(opts ...Option) (events.Stream, error) { func NewStream(opts ...events.Option) (events.Stream, error) {
// parse the options // parse the options
var options Options var options events.Options
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@@ -25,7 +25,7 @@ func NewStream(opts ...Option) (events.Stream, error) {
options.Store = memory.NewStore() options.Store = memory.NewStore()
} }
return &mem{store: options.Store}, nil return &memoryStream{store: options.Store}, nil
} }
type subscriber struct { type subscriber struct {
@@ -40,14 +40,14 @@ type subscriber struct {
ackWait time.Duration ackWait time.Duration
} }
type mem struct { type memoryStream struct {
store store.Store store store.Store
subs []*subscriber subs []*subscriber
sync.RWMutex sync.RWMutex
} }
func (m *mem) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { func (m *memoryStream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
// validate the topic // validate the topic
if len(topic) == 0 { if len(topic) == 0 {
return events.ErrMissingTopic return events.ErrMissingTopic
@@ -100,14 +100,14 @@ func (m *mem) Publish(topic string, msg interface{}, opts ...events.PublishOptio
return nil return nil
} }
func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) { func (m *memoryStream) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) {
// validate the topic // validate the topic
if len(topic) == 0 { if len(topic) == 0 {
return nil, events.ErrMissingTopic return nil, events.ErrMissingTopic
} }
// parse the options // parse the options
options := events.SubscribeOptions{ options := events.ConsumeOptions{
Queue: uuid.New().String(), Queue: uuid.New().String(),
AutoAck: true, AutoAck: true,
} }
@@ -150,7 +150,7 @@ func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan ev
// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends // lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends
// them into the subscribers channel // them into the subscribers channel
func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { func (m *memoryStream) lookupPreviousEvents(sub *subscriber, startTime time.Time) {
// lookup all events which match the topic (a blank topic will return all results) // lookup all events which match the topic (a blank topic will return all results)
recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix()) recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix())
if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
@@ -173,8 +173,12 @@ func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) {
} }
} }
func (m *memoryStream) String() string {
return "memory"
}
// handleEvents sends the event to any registered subscribers. // handleEvents sends the event to any registered subscribers.
func (m *mem) handleEvent(ev *events.Event) { func (m *memoryStream) handleEvent(ev *events.Event) {
m.RLock() m.RLock()
subs := m.subs subs := m.subs
m.RUnlock() m.RUnlock()

View File

@@ -1,6 +1,33 @@
package events package events
import "time" import (
"time"
"github.com/micro/go-micro/v3/store"
)
// Options which are used to configure the in-memory stream
type Options struct {
Store store.Store
TTL time.Duration
}
// Option is a function which configures options
type Option func(o *Options)
// WithStore sets the underlying store to use
func WithStore(s store.Store) Option {
return func(o *Options) {
o.Store = s
}
}
// WithTTL sets the default TTL
func StreamTTL(ttl time.Duration) Option {
return func(o *Options) {
o.TTL = ttl
}
}
// PublishOptions contains all the options which can be provided when publishing an event // PublishOptions contains all the options which can be provided when publishing an event
type PublishOptions struct { type PublishOptions struct {
@@ -27,8 +54,8 @@ func WithTimestamp(t time.Time) PublishOption {
} }
} }
// SubscribeOptions contains all the options which can be provided when subscribing to a topic // ConsumeOptions contains all the options which can be provided when subscribing to a topic
type SubscribeOptions struct { type ConsumeOptions struct {
// Queue is the name of the subscribers queue, if two subscribers have the same queue the message // Queue is the name of the subscribers queue, if two subscribers have the same queue the message
// should only be published to one of them // should only be published to one of them
Queue string Queue string
@@ -48,42 +75,42 @@ type SubscribeOptions struct {
CustomRetries bool CustomRetries bool
} }
// SubscribeOption sets attributes on SubscribeOptions // ConsumeOption sets attributes on ConsumeOptions
type SubscribeOption func(o *SubscribeOptions) type ConsumeOption func(o *ConsumeOptions)
// WithQueue sets the Queue fielf on SubscribeOptions to the value provided // WithQueue sets the Queue fielf on ConsumeOptions to the value provided
func WithQueue(q string) SubscribeOption { func WithQueue(q string) ConsumeOption {
return func(o *SubscribeOptions) { return func(o *ConsumeOptions) {
o.Queue = q o.Queue = q
} }
} }
// WithStartAtTime sets the StartAtTime field on SubscribeOptions to the value provided // WithStartAtTime sets the StartAtTime field on ConsumeOptions to the value provided
func WithStartAtTime(t time.Time) SubscribeOption { func WithStartAtTime(t time.Time) ConsumeOption {
return func(o *SubscribeOptions) { return func(o *ConsumeOptions) {
o.StartAtTime = t o.StartAtTime = t
} }
} }
// WithAutoAck sets the AutoAck field on SubscribeOptions and an ackWait duration after which if no ack is received // WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received
// the message is requeued in case auto ack is turned off // the message is requeued in case auto ack is turned off
func WithAutoAck(ack bool, ackWait time.Duration) SubscribeOption { func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption {
return func(o *SubscribeOptions) { return func(o *ConsumeOptions) {
o.AutoAck = ack o.AutoAck = ack
o.AckWait = ackWait o.AckWait = ackWait
} }
} }
// WithRetryLimit sets the RetryLimit field on SubscribeOptions. // WithRetryLimit sets the RetryLimit field on ConsumeOptions.
// Set to -1 for infinite retries (default) // Set to -1 for infinite retries (default)
func WithRetryLimit(retries int) SubscribeOption { func WithRetryLimit(retries int) ConsumeOption {
return func(o *SubscribeOptions) { return func(o *ConsumeOptions) {
o.RetryLimit = retries o.RetryLimit = retries
o.CustomRetries = true o.CustomRetries = true
} }
} }
func (s SubscribeOptions) GetRetryLimit() int { func (s ConsumeOptions) GetRetryLimit() int {
if !s.CustomRetries { if !s.CustomRetries {
return -1 return -1
} }

View File

@@ -1,28 +0,0 @@
package store
import (
"time"
"github.com/micro/go-micro/v3/store"
)
type Options struct {
Store store.Store
TTL time.Duration
}
type Option func(o *Options)
// WithStore sets the underlying store to use
func WithStore(s store.Store) Option {
return func(o *Options) {
o.Store = s
}
}
// WithTTL sets the default TTL
func WithTTL(ttl time.Duration) Option {
return func(o *Options) {
o.TTL = ttl
}
}

View File

@@ -1,103 +0,0 @@
package store
import (
"encoding/json"
"time"
"github.com/micro/go-micro/v3/events"
gostore "github.com/micro/go-micro/v3/store"
"github.com/micro/go-micro/v3/store/memory"
"github.com/pkg/errors"
)
const joinKey = "/"
// NewStore returns an initialized events store
func NewStore(opts ...Option) events.Store {
// parse the options
var options Options
for _, o := range opts {
o(&options)
}
if options.TTL.Seconds() == 0 {
options.TTL = time.Hour * 24
}
if options.Store == nil {
options.Store = memory.NewStore()
}
// return the store
return &evStore{options}
}
type evStore struct {
opts Options
}
// Read events for a topic
func (s *evStore) Read(topic string, opts ...events.ReadOption) ([]*events.Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, events.ErrMissingTopic
}
// parse the options
options := events.ReadOptions{
Offset: 0,
Limit: 250,
}
for _, o := range opts {
o(&options)
}
// execute the request
recs, err := s.opts.Store.Read(topic+joinKey,
gostore.ReadPrefix(),
gostore.ReadLimit(options.Limit),
gostore.ReadOffset(options.Offset),
)
if err != nil {
return nil, errors.Wrap(err, "Error reading from store")
}
// unmarshal the result
result := make([]*events.Event, len(recs))
for i, r := range recs {
var e events.Event
if err := json.Unmarshal(r.Value, &e); err != nil {
return nil, errors.Wrap(err, "Invalid event returned from stroe")
}
result[i] = &e
}
return result, nil
}
// Write an event to the store
func (s *evStore) Write(event *events.Event, opts ...events.WriteOption) error {
// parse the options
options := events.WriteOptions{
TTL: s.opts.TTL,
}
for _, o := range opts {
o(&options)
}
// construct the store record
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error mashaling event to JSON")
}
record := &gostore.Record{
Key: event.Topic + joinKey + event.ID,
Value: bytes,
Expiry: options.TTL,
}
// write the record to the store
if err := s.opts.Store.Write(record); err != nil {
return errors.Wrap(err, "Error writing to the store")
}
return nil
}

View File

@@ -1,48 +0,0 @@
package store
import (
"testing"
"github.com/google/uuid"
"github.com/micro/go-micro/v3/events"
"github.com/stretchr/testify/assert"
)
func TestStore(t *testing.T) {
store := NewStore()
testData := []events.Event{
{ID: uuid.New().String(), Topic: "foo"},
{ID: uuid.New().String(), Topic: "foo"},
{ID: uuid.New().String(), Topic: "bar"},
}
// write the records to the store
t.Run("Write", func(t *testing.T) {
for _, event := range testData {
err := store.Write(&event)
assert.Nilf(t, err, "Writing an event should not return an error")
}
})
// should not be able to read events from a blank topic
t.Run("ReadMissingTopic", func(t *testing.T) {
evs, err := store.Read("")
assert.Equal(t, err, events.ErrMissingTopic, "Reading a blank topic should return an error")
assert.Nil(t, evs, "No events should be returned")
})
// should only get the events from the topic requested
t.Run("ReadTopic", func(t *testing.T) {
evs, err := store.Read("foo")
assert.Nilf(t, err, "No error should be returned")
assert.Len(t, evs, 2, "Only the events for this topic should be returned")
})
// limits should be honoured
t.Run("ReadTopicLimit", func(t *testing.T) {
evs, err := store.Read("foo", events.ReadLimit(1))
assert.Nilf(t, err, "No error should be returned")
assert.Len(t, evs, 1, "The result should include no more than the read limit")
})
}

View File

@@ -1,18 +0,0 @@
package memory
import "github.com/micro/go-micro/v3/store"
// Options which are used to configure the in-memory stream
type Options struct {
Store store.Store
}
// Option is a function which configures options
type Option func(o *Options)
// Store sets the store to use
func Store(s store.Store) Option {
return func(o *Options) {
o.Store = s
}
}