104 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			104 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package store
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/v3/events"
 | 
						|
	gostore "github.com/micro/go-micro/v3/store"
 | 
						|
	"github.com/micro/go-micro/v3/store/memory"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
)
 | 
						|
 | 
						|
const joinKey = "/"
 | 
						|
 | 
						|
// NewStore returns an initialized events store
 | 
						|
func NewStore(opts ...Option) events.Store {
 | 
						|
	// parse the options
 | 
						|
	var options Options
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
	if options.TTL.Seconds() == 0 {
 | 
						|
		options.TTL = time.Hour * 24
 | 
						|
	}
 | 
						|
	if options.Store == nil {
 | 
						|
		options.Store = memory.NewStore()
 | 
						|
	}
 | 
						|
 | 
						|
	// return the store
 | 
						|
	return &evStore{options}
 | 
						|
}
 | 
						|
 | 
						|
type evStore struct {
 | 
						|
	opts Options
 | 
						|
}
 | 
						|
 | 
						|
// Read events for a topic
 | 
						|
func (s *evStore) Read(topic string, opts ...events.ReadOption) ([]*events.Event, error) {
 | 
						|
	// validate the topic
 | 
						|
	if len(topic) == 0 {
 | 
						|
		return nil, events.ErrMissingTopic
 | 
						|
	}
 | 
						|
 | 
						|
	// parse the options
 | 
						|
	options := events.ReadOptions{
 | 
						|
		Offset: 0,
 | 
						|
		Limit:  250,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	// execute the request
 | 
						|
	recs, err := s.opts.Store.Read(topic+joinKey,
 | 
						|
		gostore.ReadPrefix(),
 | 
						|
		gostore.ReadLimit(options.Limit),
 | 
						|
		gostore.ReadOffset(options.Offset),
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Wrap(err, "Error reading from store")
 | 
						|
	}
 | 
						|
 | 
						|
	// unmarshal the result
 | 
						|
	result := make([]*events.Event, len(recs))
 | 
						|
	for i, r := range recs {
 | 
						|
		var e events.Event
 | 
						|
		if err := json.Unmarshal(r.Value, &e); err != nil {
 | 
						|
			return nil, errors.Wrap(err, "Invalid event returned from stroe")
 | 
						|
		}
 | 
						|
		result[i] = &e
 | 
						|
	}
 | 
						|
 | 
						|
	return result, nil
 | 
						|
}
 | 
						|
 | 
						|
// Write an event to the store
 | 
						|
func (s *evStore) Write(event *events.Event, opts ...events.WriteOption) error {
 | 
						|
	// parse the options
 | 
						|
	options := events.WriteOptions{
 | 
						|
		TTL: s.opts.TTL,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	// construct the store record
 | 
						|
	bytes, err := json.Marshal(event)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrap(err, "Error mashaling event to JSON")
 | 
						|
	}
 | 
						|
	record := &gostore.Record{
 | 
						|
		Key:    event.Topic + joinKey + event.ID,
 | 
						|
		Value:  bytes,
 | 
						|
		Expiry: options.TTL,
 | 
						|
	}
 | 
						|
 | 
						|
	// write the record to the store
 | 
						|
	if err := s.opts.Store.Write(record); err != nil {
 | 
						|
		return errors.Wrap(err, "Error writing to the store")
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |