micro/events/store/store.go

104 lines
2.1 KiB
Go
Raw Normal View History

package store
import (
"encoding/json"
"time"
"github.com/micro/go-micro/v3/events"
gostore "github.com/micro/go-micro/v3/store"
"github.com/micro/go-micro/v3/store/memory"
"github.com/pkg/errors"
)
const joinKey = "/"
// NewStore returns an initialized events store
func NewStore(opts ...Option) events.Store {
// parse the options
var options Options
for _, o := range opts {
o(&options)
}
if options.TTL.Seconds() == 0 {
options.TTL = time.Hour * 24
}
if options.Store == nil {
options.Store = memory.NewStore()
}
// return the store
return &evStore{options}
}
type evStore struct {
opts Options
}
// Read events for a topic
func (s *evStore) Read(topic string, opts ...events.ReadOption) ([]*events.Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, events.ErrMissingTopic
}
// parse the options
options := events.ReadOptions{
Offset: 0,
Limit: 250,
}
for _, o := range opts {
o(&options)
}
// execute the request
recs, err := s.opts.Store.Read(topic+joinKey,
gostore.ReadPrefix(),
gostore.ReadLimit(options.Limit),
gostore.ReadOffset(options.Offset),
)
if err != nil {
return nil, errors.Wrap(err, "Error reading from store")
}
// unmarshal the result
result := make([]*events.Event, len(recs))
for i, r := range recs {
var e events.Event
if err := json.Unmarshal(r.Value, &e); err != nil {
return nil, errors.Wrap(err, "Invalid event returned from stroe")
}
result[i] = &e
}
return result, nil
}
// Write an event to the store
func (s *evStore) Write(event *events.Event, opts ...events.WriteOption) error {
// parse the options
options := events.WriteOptions{
TTL: s.opts.TTL,
}
for _, o := range opts {
o(&options)
}
// construct the store record
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error mashaling event to JSON")
}
record := &gostore.Record{
Key: event.Topic + joinKey + event.ID,
Value: bytes,
Expiry: options.TTL,
}
// write the record to the store
if err := s.opts.Store.Write(record); err != nil {
return errors.Wrap(err, "Error writing to the store")
}
return nil
}