commit c65fec6a6cda564934a93e91bbb46dbd83445720 Author: Janos Dobronszki Date: Tue Apr 7 13:53:22 2020 +0200 Disk backed local store (#1491) diff --git a/file.go b/file.go new file mode 100644 index 0000000..2c6b609 --- /dev/null +++ b/file.go @@ -0,0 +1,351 @@ +// Package local is a file system backed store +package file + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/micro/go-micro/v2/store" + micro_store "github.com/micro/go-micro/v2/store" + bolt "go.etcd.io/bbolt" + + "github.com/pkg/errors" +) + +// NewStore returns a memory store +func NewStore(opts ...store.Option) store.Store { + s := &fileStore{ + options: store.Options{}, + } + for _, o := range opts { + o(&s.options) + } + return s +} + +type fileStore struct { + options store.Options + dir string + fileName string + fullFilePath string +} + +func (m *fileStore) Init(opts ...store.Option) error { + // m.store.Flush() + for _, o := range opts { + o(&m.options) + } + if m.options.Database == "" { + m.options.Database = "default" + } + if m.options.Table == "" { + // bbolt requires bucketname to not be empty + m.options.Table = "default" + } + dir := filepath.Join(os.TempDir(), "micro") + fname := m.options.Database + ".db" + _ = os.Mkdir(dir, 0700) + m.dir = dir + m.fileName = fname + m.fullFilePath = filepath.Join(dir, fname) + return nil +} + +func (m *fileStore) String() string { + return "local" +} + +func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + readOpts := store.ReadOptions{} + for _, o := range opts { + o(&readOpts) + } + + var keys []string + + // Handle Prefix / suffix + if readOpts.Prefix || readOpts.Suffix { + var opts []store.ListOption + if readOpts.Prefix { + opts = append(opts, store.ListPrefix(key)) + } + if readOpts.Suffix { + opts = append(opts, store.ListSuffix(key)) + } + opts = append(opts, store.ListLimit(readOpts.Limit)) + opts = append(opts, store.ListOffset(readOpts.Offset)) + k, err := m.List(opts...) + if err != nil { + return nil, errors.Wrap(err, "FileStore: Read couldn't List()") + } + keys = k + } else { + keys = []string{key} + } + + var results []*store.Record + for _, k := range keys { + r, err := m.get(k) + if err != nil { + return results, err + } + results = append(results, r) + } + return results, nil +} + +func (m *fileStore) get(k string) (*store.Record, error) { + if len(m.options.Table) > 0 { + k = m.options.Table + "/" + k + } + if len(m.options.Database) > 0 { + k = m.options.Database + "/" + k + } + store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return nil, err + } + defer store.Close() + err = store.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(m.options.Table)) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + var value []byte + store.View(func(tx *bolt.Tx) error { + // @todo this is still very experimental... + bucket := tx.Bucket([]byte(m.options.Table)) + value = bucket.Get([]byte(k)) + return nil + }) + if value == nil { + return nil, micro_store.ErrNotFound + } + storedRecord := &internalRecord{} + err = json.Unmarshal(value, storedRecord) + if err != nil { + return nil, err + } + newRecord := µ_store.Record{} + newRecord.Key = storedRecord.Key + newRecord.Value = storedRecord.Value + if !storedRecord.ExpiresAt.IsZero() { + if storedRecord.ExpiresAt.Before(time.Now()) { + return nil, micro_store.ErrNotFound + } + newRecord.Expiry = time.Until(storedRecord.ExpiresAt) + } + + return newRecord, nil +} + +func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { + writeOpts := store.WriteOptions{} + for _, o := range opts { + o(&writeOpts) + } + + if len(opts) > 0 { + // Copy the record before applying options, or the incoming record will be mutated + newRecord := store.Record{} + newRecord.Key = r.Key + newRecord.Value = r.Value + newRecord.Expiry = r.Expiry + + if !writeOpts.Expiry.IsZero() { + newRecord.Expiry = time.Until(writeOpts.Expiry) + } + if writeOpts.TTL != 0 { + newRecord.Expiry = writeOpts.TTL + } + return m.set(&newRecord) + } + return m.set(r) +} + +func (m *fileStore) set(r *store.Record) error { + key := r.Key + if len(m.options.Table) > 0 { + key = m.options.Table + "/" + key + } + if len(m.options.Database) > 0 { + key = m.options.Database + "/" + key + } + + // copy the incoming record and then + // convert the expiry in to a hard timestamp + i := &internalRecord{} + i.Key = r.Key + i.Value = r.Value + if r.Expiry != 0 { + i.ExpiresAt = time.Now().Add(r.Expiry) + } + + iJSON, _ := json.Marshal(i) + + store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return err + } + defer store.Close() + return store.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(m.options.Table)) + if b == nil { + var err error + b, err = tx.CreateBucketIfNotExists([]byte(m.options.Table)) + if err != nil { + return err + } + } + return b.Put([]byte(key), iJSON) + }) +} + +func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error { + deleteOptions := store.DeleteOptions{} + for _, o := range opts { + o(&deleteOptions) + } + return m.delete(key) +} + +func (m *fileStore) delete(key string) error { + if len(m.options.Table) > 0 { + key = m.options.Table + "/" + key + } + if len(m.options.Database) > 0 { + key = m.options.Database + "/" + key + } + store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return err + } + defer store.Close() + return store.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(m.options.Table)) + if b == nil { + var err error + b, err = tx.CreateBucketIfNotExists([]byte(m.options.Table)) + if err != nil { + return err + } + } + err := b.Delete([]byte(key)) + return err + }) +} + +func (m *fileStore) deleteAll() error { + return os.Remove(m.fullFilePath) +} + +func (m *fileStore) Options() store.Options { + return m.options +} + +func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { + listOptions := store.ListOptions{} + + for _, o := range opts { + o(&listOptions) + } + allKeys := m.list(listOptions.Limit, listOptions.Offset) + + if len(listOptions.Prefix) > 0 { + var prefixKeys []string + for _, k := range allKeys { + if strings.HasPrefix(k, listOptions.Prefix) { + prefixKeys = append(prefixKeys, k) + } + } + allKeys = prefixKeys + } + if len(listOptions.Suffix) > 0 { + var suffixKeys []string + for _, k := range allKeys { + if strings.HasSuffix(k, listOptions.Suffix) { + suffixKeys = append(suffixKeys, k) + } + } + allKeys = suffixKeys + } + + return allKeys, nil +} + +func (m *fileStore) list(limit, offset uint) []string { + allItems := []string{} + store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + fmt.Println("Error creating file:", err) + } + defer store.Close() + store.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(m.options.Table)) + if b == nil { + var err error + b, err = tx.CreateBucketIfNotExists([]byte(m.options.Table)) + if err != nil { + return err + } + } + // @todo very inefficient + if err := b.ForEach(func(k, v []byte) error { + storedRecord := &internalRecord{} + err := json.Unmarshal(v, storedRecord) + if err != nil { + return err + } + if !storedRecord.ExpiresAt.IsZero() { + if storedRecord.ExpiresAt.Before(time.Now()) { + return nil + } + } + allItems = append(allItems, string(k)) + return nil + }); err != nil { + return err + } + + return nil + }) + allKeys := make([]string, len(allItems)) + i := 0 + for _, k := range allItems { + if len(m.options.Database) > 0 { + k = strings.TrimPrefix(k, m.options.Database+"/") + } + if len(m.options.Table) > 0 { + k = strings.TrimPrefix(k, m.options.Table+"/") + } + allKeys[i] = k + i++ + } + if limit != 0 || offset != 0 { + sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) + min := func(i, j uint) uint { + if i < j { + return i + } + return j + } + return allKeys[offset:min(limit, uint(len(allKeys)))] + } + return allKeys +} + +type internalRecord struct { + Key string + Value []byte + ExpiresAt time.Time +} diff --git a/file_test.go b/file_test.go new file mode 100644 index 0000000..26a20cd --- /dev/null +++ b/file_test.go @@ -0,0 +1,281 @@ +package file + +import ( + "fmt" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/kr/pretty" + "github.com/micro/go-micro/v2/store" +) + +func TestFileReInit(t *testing.T) { + s := NewStore(store.Table("aaa")) + s.Init(store.Table("bbb")) + if s.Options().Table != "bbb" { + t.Error("Init didn't reinitialise the store") + } +} + +func TestFileBasic(t *testing.T) { + s := NewStore() + s.Init() + if err := s.(*fileStore).deleteAll(); err != nil { + t.Logf("Can't delete all: %v", err) + } + basictest(s, t) +} + +func TestFileTable(t *testing.T) { + s := NewStore() + s.Init(store.Table("some-Table")) + if err := s.(*fileStore).deleteAll(); err != nil { + t.Logf("Can't delete all: %v", err) + } + basictest(s, t) +} + +func TestFileDatabase(t *testing.T) { + s := NewStore() + s.Init(store.Database("some-Database")) + if err := s.(*fileStore).deleteAll(); err != nil { + t.Logf("Can't delete all: %v", err) + } + basictest(s, t) +} + +func TestFileDatabaseTable(t *testing.T) { + s := NewStore() + s.Init(store.Table("some-Table"), store.Database("some-Database")) + if err := s.(*fileStore).deleteAll(); err != nil { + t.Logf("Can't delete all: %v", err) + } + basictest(s, t) +} + +func basictest(s store.Store, t *testing.T) { + t.Logf("Testing store %s, with options %# v\n", s.String(), pretty.Formatter(s.Options())) + // Read and Write an expiring Record + if err := s.Write(&store.Record{ + Key: "Hello", + Value: []byte("World"), + Expiry: time.Millisecond * 100, + }); err != nil { + t.Error(err) + } + if r, err := s.Read("Hello"); err != nil { + t.Error(err) + } else { + if len(r) != 1 { + t.Error("Read returned multiple records") + } + if r[0].Key != "Hello" { + t.Errorf("Expected %s, got %s", "Hello", r[0].Key) + } + if string(r[0].Value) != "World" { + t.Errorf("Expected %s, got %s", "World", r[0].Value) + } + } + time.Sleep(time.Millisecond * 200) + if _, err := s.Read("Hello"); err != store.ErrNotFound { + t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err) + } + + // Write 3 records with various expiry and get with Table + records := []*store.Record{ + &store.Record{ + Key: "foo", + Value: []byte("foofoo"), + }, + &store.Record{ + Key: "foobar", + Value: []byte("foobarfoobar"), + Expiry: time.Millisecond * 100, + }, + &store.Record{ + Key: "foobarbaz", + Value: []byte("foobarbazfoobarbaz"), + Expiry: 2 * time.Millisecond * 100, + }, + } + for _, r := range records { + if err := s.Write(r); err != nil { + t.Errorf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err) + } + } + if results, err := s.Read("foo", store.ReadPrefix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 3 { + t.Errorf("Expected 3 items, got %d", len(results)) + t.Logf("Table test: %v\n", spew.Sdump(results)) + } + } + time.Sleep(time.Millisecond * 100) + if results, err := s.Read("foo", store.ReadPrefix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 2 { + t.Errorf("Expected 2 items, got %d", len(results)) + t.Logf("Table test: %v\n", spew.Sdump(results)) + } + } + time.Sleep(time.Millisecond * 100) + if results, err := s.Read("foo", store.ReadPrefix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 1 { + t.Errorf("Expected 1 item, got %d", len(results)) + t.Logf("Table test: %# v\n", spew.Sdump(results)) + } + } + if err := s.Delete("foo", func(d *store.DeleteOptions) {}); err != nil { + t.Errorf("Delete failed (%v)", err) + } + if results, err := s.Read("foo", store.ReadPrefix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 0 { + t.Errorf("Expected 0 items, got %d (%# v)", len(results), spew.Sdump(results)) + } + } + + // Write 3 records with various expiry and get with Suffix + records = []*store.Record{ + &store.Record{ + Key: "foo", + Value: []byte("foofoo"), + }, + &store.Record{ + Key: "barfoo", + Value: []byte("barfoobarfoo"), + Expiry: time.Millisecond * 100, + }, + &store.Record{ + Key: "bazbarfoo", + Value: []byte("bazbarfoobazbarfoo"), + Expiry: 2 * time.Millisecond * 100, + }, + } + for _, r := range records { + if err := s.Write(r); err != nil { + t.Errorf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err) + } + } + if results, err := s.Read("foo", store.ReadSuffix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 3 { + t.Errorf("Expected 3 items, got %d", len(results)) + t.Logf("Table test: %v\n", spew.Sdump(results)) + } + + } + time.Sleep(time.Millisecond * 100) + if results, err := s.Read("foo", store.ReadSuffix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 2 { + t.Errorf("Expected 2 items, got %d", len(results)) + t.Logf("Table test: %v\n", spew.Sdump(results)) + } + + } + time.Sleep(time.Millisecond * 100) + if results, err := s.Read("foo", store.ReadSuffix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 1 { + t.Errorf("Expected 1 item, got %d", len(results)) + t.Logf("Table test: %# v\n", spew.Sdump(results)) + } + } + if err := s.Delete("foo"); err != nil { + t.Errorf("Delete failed (%v)", err) + } + if results, err := s.Read("foo", store.ReadSuffix()); err != nil { + t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + } else { + if len(results) != 0 { + t.Errorf("Expected 0 items, got %d (%# v)", len(results), spew.Sdump(results)) + } + } + + // Test Table, Suffix and WriteOptions + if err := s.Write(&store.Record{ + Key: "foofoobarbar", + Value: []byte("something"), + }, store.WriteTTL(time.Millisecond*100)); err != nil { + t.Error(err) + } + if err := s.Write(&store.Record{ + Key: "foofoo", + Value: []byte("something"), + }, store.WriteExpiry(time.Now().Add(time.Millisecond*100))); err != nil { + t.Error(err) + } + if err := s.Write(&store.Record{ + Key: "barbar", + Value: []byte("something"), + // TTL has higher precedence than expiry + }, store.WriteExpiry(time.Now().Add(time.Hour)), store.WriteTTL(time.Millisecond*100)); err != nil { + t.Error(err) + } + if results, err := s.Read("foo", store.ReadPrefix(), store.ReadSuffix()); err != nil { + t.Error(err) + } else { + if len(results) != 1 { + t.Errorf("Expected 1 results, got %d: %# v", len(results), spew.Sdump(results)) + } + } + time.Sleep(time.Millisecond * 100) + if results, err := s.List(); err != nil { + t.Errorf("List failed: %s", err) + } else { + if len(results) != 0 { + t.Errorf("Expiry options were not effective, results :%v", spew.Sdump(results)) + } + } + s.Write(&store.Record{Key: "a", Value: []byte("a")}) + s.Write(&store.Record{Key: "aa", Value: []byte("aa")}) + s.Write(&store.Record{Key: "aaa", Value: []byte("aaa")}) + if results, err := s.Read("b", store.ReadPrefix()); err != nil { + t.Error(err) + } else { + if len(results) != 0 { + t.Errorf("Expected 0 results, got %d", len(results)) + } + } + + s.Init() + if err := s.(*fileStore).deleteAll(); err != nil { + t.Logf("Can't delete all: %v", err) + } + for i := 0; i < 10; i++ { + s.Write(&store.Record{ + Key: fmt.Sprintf("a%d", i), + Value: []byte{}, + }) + } + if results, err := s.Read("a", store.ReadLimit(5), store.ReadPrefix()); err != nil { + t.Error(err) + } else { + if len(results) != 5 { + t.Error("Expected 5 results, got ", len(results)) + } + if results[0].Key != "a0" { + t.Errorf("Expected a0, got %s", results[0].Key) + } + if results[4].Key != "a4" { + t.Errorf("Expected a4, got %s", results[4].Key) + } + } + if results, err := s.Read("a", store.ReadLimit(30), store.ReadOffset(5), store.ReadPrefix()); err != nil { + t.Error(err) + } else { + if len(results) != 5 { + t.Error("Expected 5 results, got ", len(results)) + } + } +}