104 lines
2.1 KiB
Go
104 lines
2.1 KiB
Go
|
package store
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"time"
|
||
|
|
||
|
"github.com/micro/go-micro/v3/events"
|
||
|
gostore "github.com/micro/go-micro/v3/store"
|
||
|
"github.com/micro/go-micro/v3/store/memory"
|
||
|
"github.com/pkg/errors"
|
||
|
)
|
||
|
|
||
|
const joinKey = "/"
|
||
|
|
||
|
// NewStore returns an initialized events store
|
||
|
func NewStore(opts ...Option) events.Store {
|
||
|
// parse the options
|
||
|
var options Options
|
||
|
for _, o := range opts {
|
||
|
o(&options)
|
||
|
}
|
||
|
if options.TTL.Seconds() == 0 {
|
||
|
options.TTL = time.Hour * 24
|
||
|
}
|
||
|
if options.Store == nil {
|
||
|
options.Store = memory.NewStore()
|
||
|
}
|
||
|
|
||
|
// return the store
|
||
|
return &evStore{options}
|
||
|
}
|
||
|
|
||
|
type evStore struct {
|
||
|
opts Options
|
||
|
}
|
||
|
|
||
|
// Read events for a topic
|
||
|
func (s *evStore) Read(topic string, opts ...events.ReadOption) ([]*events.Event, error) {
|
||
|
// validate the topic
|
||
|
if len(topic) == 0 {
|
||
|
return nil, events.ErrMissingTopic
|
||
|
}
|
||
|
|
||
|
// parse the options
|
||
|
options := events.ReadOptions{
|
||
|
Offset: 0,
|
||
|
Limit: 250,
|
||
|
}
|
||
|
for _, o := range opts {
|
||
|
o(&options)
|
||
|
}
|
||
|
|
||
|
// execute the request
|
||
|
recs, err := s.opts.Store.Read(topic+joinKey,
|
||
|
gostore.ReadPrefix(),
|
||
|
gostore.ReadLimit(options.Limit),
|
||
|
gostore.ReadOffset(options.Offset),
|
||
|
)
|
||
|
if err != nil {
|
||
|
return nil, errors.Wrap(err, "Error reading from store")
|
||
|
}
|
||
|
|
||
|
// unmarshal the result
|
||
|
result := make([]*events.Event, len(recs))
|
||
|
for i, r := range recs {
|
||
|
var e events.Event
|
||
|
if err := json.Unmarshal(r.Value, &e); err != nil {
|
||
|
return nil, errors.Wrap(err, "Invalid event returned from stroe")
|
||
|
}
|
||
|
result[i] = &e
|
||
|
}
|
||
|
|
||
|
return result, nil
|
||
|
}
|
||
|
|
||
|
// Write an event to the store
|
||
|
func (s *evStore) Write(event *events.Event, opts ...events.WriteOption) error {
|
||
|
// parse the options
|
||
|
options := events.WriteOptions{
|
||
|
TTL: s.opts.TTL,
|
||
|
}
|
||
|
for _, o := range opts {
|
||
|
o(&options)
|
||
|
}
|
||
|
|
||
|
// construct the store record
|
||
|
bytes, err := json.Marshal(event)
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, "Error mashaling event to JSON")
|
||
|
}
|
||
|
record := &gostore.Record{
|
||
|
Key: event.Topic + joinKey + event.ID,
|
||
|
Value: bytes,
|
||
|
Expiry: options.TTL,
|
||
|
}
|
||
|
|
||
|
// write the record to the store
|
||
|
if err := s.opts.Store.Write(record); err != nil {
|
||
|
return errors.Wrap(err, "Error writing to the store")
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|