diff --git a/file.go b/file.go index 26df09e..2f92438 100644 --- a/file.go +++ b/file.go @@ -3,7 +3,6 @@ package file import ( "encoding/json" - "fmt" "os" "path/filepath" "sort" @@ -11,10 +10,8 @@ import ( "time" "github.com/micro/go-micro/v2/store" - micro_store "github.com/micro/go-micro/v2/store" - bolt "go.etcd.io/bbolt" - "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" ) var ( @@ -29,49 +26,213 @@ var ( // NewStore returns a memory store func NewStore(opts ...store.Option) store.Store { - s := &fileStore{ - options: store.Options{}, - } + s := &fileStore{} s.init(opts...) return s } type fileStore struct { - options store.Options - dir string - fileName string - fullFilePath string + options store.Options + dir string + fileName string + dbPath string + // the database handle + db *bolt.DB } -func (m *fileStore) Init(opts ...store.Option) error { - return m.init(opts...) +// record stored by us +type record struct { + Key string + Value []byte + ExpiresAt time.Time +} + +func (m *fileStore) delete(key string) error { + return m.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(m.options.Table)) + if b == nil { + return nil + } + return b.Delete([]byte(key)) + }) } func (m *fileStore) init(opts ...store.Option) error { for _, o := range opts { o(&m.options) } + if m.options.Database == "" { m.options.Database = DefaultDatabase } + if m.options.Table == "" { // bbolt requires bucketname to not be empty m.options.Table = DefaultTable } + + // create a directory /tmp/micro dir := filepath.Join(DefaultDir, "micro") + // create the database handle fname := m.options.Database + ".db" // Ignoring this as the folder might exist. // Reads/Writes updates will return with sensible error messages // about the dir not existing in case this cannot create the path anyway _ = os.Mkdir(dir, 0700) + m.dir = dir m.fileName = fname - m.fullFilePath = filepath.Join(dir, fname) - return nil + m.dbPath = filepath.Join(dir, fname) + + // close existing handle + if m.db != nil { + m.db.Close() + } + + // create new db handle + db, err := bolt.Open(m.dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) + if err != nil { + return err + } + + // set the new db + m.db = db + + // create the table + return db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(m.options.Table)) + return err + }) } -func (m *fileStore) String() string { - return "local" +func (m *fileStore) list(limit, offset uint) []string { + var allItems []string + + m.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(m.options.Table)) + // nothing to read + if b == nil { + return nil + } + + // @todo very inefficient + if err := b.ForEach(func(k, v []byte) error { + storedRecord := &record{} + + if err := json.Unmarshal(v, storedRecord); err != nil { + return err + } + + if !storedRecord.ExpiresAt.IsZero() { + if storedRecord.ExpiresAt.Before(time.Now()) { + return nil + } + } + + allItems = append(allItems, string(k)) + + return nil + }); err != nil { + return err + } + + return nil + }) + + allKeys := make([]string, len(allItems)) + + for i, k := range allItems { + allKeys[i] = k + } + + if limit != 0 || offset != 0 { + sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) + min := func(i, j uint) uint { + if i < j { + return i + } + return j + } + return allKeys[offset:min(limit, uint(len(allKeys)))] + } + + return allKeys +} + +func (m *fileStore) get(k string) (*store.Record, error) { + var value []byte + + m.db.View(func(tx *bolt.Tx) error { + // @todo this is still very experimental... + b := tx.Bucket([]byte(m.options.Table)) + if b == nil { + return nil + } + + value = b.Get([]byte(k)) + return nil + }) + + if value == nil { + return nil, store.ErrNotFound + } + + storedRecord := &record{} + + if err := json.Unmarshal(value, storedRecord); err != nil { + return nil, err + } + + newRecord := &store.Record{} + newRecord.Key = storedRecord.Key + newRecord.Value = storedRecord.Value + + if !storedRecord.ExpiresAt.IsZero() { + if storedRecord.ExpiresAt.Before(time.Now()) { + return nil, store.ErrNotFound + } + newRecord.Expiry = time.Until(storedRecord.ExpiresAt) + } + + return newRecord, nil +} + +func (m *fileStore) set(r *store.Record) error { + // copy the incoming record and then + // convert the expiry in to a hard timestamp + item := &record{} + item.Key = r.Key + item.Value = r.Value + if r.Expiry != 0 { + item.ExpiresAt = time.Now().Add(r.Expiry) + } + + // marshal the data + data, _ := json.Marshal(item) + + return m.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(m.options.Table)) + if b == nil { + var err error + b, err = tx.CreateBucketIfNotExists([]byte(m.options.Table)) + if err != nil { + return err + } + } + return b.Put([]byte(r.Key), data) + }) +} + +func (m *fileStore) Init(opts ...store.Option) error { + return m.init(opts...) +} + +func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error { + deleteOptions := store.DeleteOptions{} + for _, o := range opts { + o(&deleteOptions) + } + return m.delete(key) } func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { @@ -83,6 +244,7 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, var keys []string // Handle Prefix / suffix + // TODO: do range scan here rather than listing all keys if readOpts.Prefix || readOpts.Suffix { var opts []store.ListOption if readOpts.Prefix { @@ -91,18 +253,22 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, if readOpts.Suffix { opts = append(opts, store.ListSuffix(key)) } + opts = append(opts, store.ListLimit(readOpts.Limit)) opts = append(opts, store.ListOffset(readOpts.Offset)) + k, err := m.List(opts...) if err != nil { return nil, errors.Wrap(err, "FileStore: Read couldn't List()") } + keys = k } else { keys = []string{key} } var results []*store.Record + for _, k := range keys { r, err := m.get(k) if err != nil { @@ -110,59 +276,10 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, } results = append(results, r) } + return results, nil } -func (m *fileStore) get(k string) (*store.Record, error) { - if len(m.options.Table) > 0 { - k = m.options.Table + "/" + k - } - if len(m.options.Database) > 0 { - k = m.options.Database + "/" + k - } - store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { - return nil, err - } - defer store.Close() - err = store.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists([]byte(m.options.Table)) - if err != nil { - return err - } - return nil - }) - if err != nil { - return nil, err - } - var value []byte - store.View(func(tx *bolt.Tx) error { - // @todo this is still very experimental... - bucket := tx.Bucket([]byte(m.options.Table)) - value = bucket.Get([]byte(k)) - return nil - }) - if value == nil { - return nil, micro_store.ErrNotFound - } - storedRecord := &internalRecord{} - err = json.Unmarshal(value, storedRecord) - if err != nil { - return nil, err - } - newRecord := µ_store.Record{} - newRecord.Key = storedRecord.Key - newRecord.Value = storedRecord.Value - if !storedRecord.ExpiresAt.IsZero() { - if storedRecord.ExpiresAt.Before(time.Now()) { - return nil, micro_store.ErrNotFound - } - newRecord.Expiry = time.Until(storedRecord.ExpiresAt) - } - - return newRecord, nil -} - func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { writeOpts := store.WriteOptions{} for _, o := range opts { @@ -182,87 +299,13 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { if writeOpts.TTL != 0 { newRecord.Expiry = writeOpts.TTL } + return m.set(&newRecord) } + return m.set(r) } -func (m *fileStore) set(r *store.Record) error { - key := r.Key - if len(m.options.Table) > 0 { - key = m.options.Table + "/" + key - } - if len(m.options.Database) > 0 { - key = m.options.Database + "/" + key - } - - // copy the incoming record and then - // convert the expiry in to a hard timestamp - i := &internalRecord{} - i.Key = r.Key - i.Value = r.Value - if r.Expiry != 0 { - i.ExpiresAt = time.Now().Add(r.Expiry) - } - - iJSON, _ := json.Marshal(i) - - store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { - return err - } - defer store.Close() - return store.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(m.options.Table)) - if b == nil { - var err error - b, err = tx.CreateBucketIfNotExists([]byte(m.options.Table)) - if err != nil { - return err - } - } - return b.Put([]byte(key), iJSON) - }) -} - -func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error { - deleteOptions := store.DeleteOptions{} - for _, o := range opts { - o(&deleteOptions) - } - return m.delete(key) -} - -func (m *fileStore) delete(key string) error { - if len(m.options.Table) > 0 { - key = m.options.Table + "/" + key - } - if len(m.options.Database) > 0 { - key = m.options.Database + "/" + key - } - store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { - return err - } - defer store.Close() - return store.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(m.options.Table)) - if b == nil { - var err error - b, err = tx.CreateBucketIfNotExists([]byte(m.options.Table)) - if err != nil { - return err - } - } - err := b.Delete([]byte(key)) - return err - }) -} - -func (m *fileStore) deleteAll() error { - return os.Remove(m.fullFilePath) -} - func (m *fileStore) Options() store.Options { return m.options } @@ -273,6 +316,8 @@ func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { for _, o := range opts { o(&listOptions) } + + // TODO apply prefix/suffix in range query allKeys := m.list(listOptions.Limit, listOptions.Offset) if len(listOptions.Prefix) > 0 { @@ -284,6 +329,7 @@ func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { } allKeys = prefixKeys } + if len(listOptions.Suffix) > 0 { var suffixKeys []string for _, k := range allKeys { @@ -297,69 +343,6 @@ func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { return allKeys, nil } -func (m *fileStore) list(limit, offset uint) []string { - allItems := []string{} - store, err := bolt.Open(m.fullFilePath, 0700, &bolt.Options{Timeout: 1 * time.Second}) - if err != nil { - fmt.Println("Error creating file:", err) - } - defer store.Close() - store.Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(m.options.Table)) - if b == nil { - var err error - b, err = tx.CreateBucketIfNotExists([]byte(m.options.Table)) - if err != nil { - return err - } - } - // @todo very inefficient - if err := b.ForEach(func(k, v []byte) error { - storedRecord := &internalRecord{} - err := json.Unmarshal(v, storedRecord) - if err != nil { - return err - } - if !storedRecord.ExpiresAt.IsZero() { - if storedRecord.ExpiresAt.Before(time.Now()) { - return nil - } - } - allItems = append(allItems, string(k)) - return nil - }); err != nil { - return err - } - - return nil - }) - allKeys := make([]string, len(allItems)) - i := 0 - for _, k := range allItems { - if len(m.options.Database) > 0 { - k = strings.TrimPrefix(k, m.options.Database+"/") - } - if len(m.options.Table) > 0 { - k = strings.TrimPrefix(k, m.options.Table+"/") - } - allKeys[i] = k - i++ - } - if limit != 0 || offset != 0 { - sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) - min := func(i, j uint) uint { - if i < j { - return i - } - return j - } - return allKeys[offset:min(limit, uint(len(allKeys)))] - } - return allKeys -} - -type internalRecord struct { - Key string - Value []byte - ExpiresAt time.Time +func (m *fileStore) String() string { + return "file" } diff --git a/file_test.go b/file_test.go index 26a20cd..86c533f 100644 --- a/file_test.go +++ b/file_test.go @@ -2,6 +2,9 @@ package file import ( "fmt" + "os" + "path/filepath" + "strings" "testing" "time" @@ -10,7 +13,13 @@ import ( "github.com/micro/go-micro/v2/store" ) -func TestFileReInit(t *testing.T) { +func cleanup() { + dir := filepath.Join(DefaultDir, "micro/") + os.RemoveAll(dir) +} + +func TestFileStoreReInit(t *testing.T) { + defer cleanup() s := NewStore(store.Table("aaa")) s.Init(store.Table("bbb")) if s.Options().Table != "bbb" { @@ -18,54 +27,44 @@ func TestFileReInit(t *testing.T) { } } -func TestFileBasic(t *testing.T) { +func TestFileStoreBasic(t *testing.T) { + defer cleanup() s := NewStore() - s.Init() - if err := s.(*fileStore).deleteAll(); err != nil { - t.Logf("Can't delete all: %v", err) - } - basictest(s, t) + fileTest(s, t) } -func TestFileTable(t *testing.T) { - s := NewStore() - s.Init(store.Table("some-Table")) - if err := s.(*fileStore).deleteAll(); err != nil { - t.Logf("Can't delete all: %v", err) - } - basictest(s, t) +func TestFileStoreTable(t *testing.T) { + defer cleanup() + s := NewStore(store.Table("testTable")) + fileTest(s, t) } -func TestFileDatabase(t *testing.T) { - s := NewStore() - s.Init(store.Database("some-Database")) - if err := s.(*fileStore).deleteAll(); err != nil { - t.Logf("Can't delete all: %v", err) - } - basictest(s, t) +func TestFileStoreDatabase(t *testing.T) { + defer cleanup() + s := NewStore(store.Database("testdb")) + fileTest(s, t) } -func TestFileDatabaseTable(t *testing.T) { - s := NewStore() - s.Init(store.Table("some-Table"), store.Database("some-Database")) - if err := s.(*fileStore).deleteAll(); err != nil { - t.Logf("Can't delete all: %v", err) - } - basictest(s, t) +func TestFileStoreDatabaseTable(t *testing.T) { + defer cleanup() + s := NewStore(store.Table("testTable"), store.Database("testdb")) + fileTest(s, t) } -func basictest(s store.Store, t *testing.T) { - t.Logf("Testing store %s, with options %# v\n", s.String(), pretty.Formatter(s.Options())) +func fileTest(s store.Store, t *testing.T) { + t.Logf("Options %s %v\n", s.String(), s.Options()) + // Read and Write an expiring Record if err := s.Write(&store.Record{ Key: "Hello", Value: []byte("World"), - Expiry: time.Millisecond * 100, + Expiry: time.Millisecond * 150, }); err != nil { t.Error(err) } + if r, err := s.Read("Hello"); err != nil { - t.Error(err) + t.Fatal(err) } else { if len(r) != 1 { t.Error("Read returned multiple records") @@ -77,7 +76,10 @@ func basictest(s store.Store, t *testing.T) { t.Errorf("Expected %s, got %s", "World", r[0].Value) } } + + // wait for expiry time.Sleep(time.Millisecond * 200) + if _, err := s.Read("Hello"); err != store.ErrNotFound { t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err) } @@ -93,26 +95,14 @@ func basictest(s store.Store, t *testing.T) { Value: []byte("foobarfoobar"), Expiry: time.Millisecond * 100, }, - &store.Record{ - Key: "foobarbaz", - Value: []byte("foobarbazfoobarbaz"), - Expiry: 2 * time.Millisecond * 100, - }, } + for _, r := range records { if err := s.Write(r); err != nil { t.Errorf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err) } } - if results, err := s.Read("foo", store.ReadPrefix()); err != nil { - t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) - } else { - if len(results) != 3 { - t.Errorf("Expected 3 items, got %d", len(results)) - t.Logf("Table test: %v\n", spew.Sdump(results)) - } - } - time.Sleep(time.Millisecond * 100) + if results, err := s.Read("foo", store.ReadPrefix()); err != nil { t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) } else { @@ -121,20 +111,23 @@ func basictest(s store.Store, t *testing.T) { t.Logf("Table test: %v\n", spew.Sdump(results)) } } - time.Sleep(time.Millisecond * 100) + + // wait for the expiry + time.Sleep(time.Millisecond * 200) + if results, err := s.Read("foo", store.ReadPrefix()); err != nil { t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) - } else { - if len(results) != 1 { - t.Errorf("Expected 1 item, got %d", len(results)) - t.Logf("Table test: %# v\n", spew.Sdump(results)) - } + } else if len(results) != 1 { + t.Errorf("Expected 1 item, got %d", len(results)) + t.Logf("Table test: %v\n", spew.Sdump(results)) } - if err := s.Delete("foo", func(d *store.DeleteOptions) {}); err != nil { + + if err := s.Delete("foo"); err != nil { t.Errorf("Delete failed (%v)", err) } - if results, err := s.Read("foo", store.ReadPrefix()); err != nil { - t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) + + if results, err := s.Read("foo"); err != store.ErrNotFound { + t.Errorf("Expected read failure read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) } else { if len(results) != 0 { t.Errorf("Expected 0 items, got %d (%# v)", len(results), spew.Sdump(results)) @@ -148,8 +141,9 @@ func basictest(s store.Store, t *testing.T) { Value: []byte("foofoo"), }, &store.Record{ - Key: "barfoo", - Value: []byte("barfoobarfoo"), + Key: "barfoo", + Value: []byte("barfoobarfoo"), + Expiry: time.Millisecond * 100, }, &store.Record{ @@ -222,6 +216,7 @@ func basictest(s store.Store, t *testing.T) { }, store.WriteExpiry(time.Now().Add(time.Hour)), store.WriteTTL(time.Millisecond*100)); err != nil { t.Error(err) } + if results, err := s.Read("foo", store.ReadPrefix(), store.ReadSuffix()); err != nil { t.Error(err) } else { @@ -229,7 +224,9 @@ func basictest(s store.Store, t *testing.T) { t.Errorf("Expected 1 results, got %d: %# v", len(results), spew.Sdump(results)) } } + time.Sleep(time.Millisecond * 100) + if results, err := s.List(); err != nil { t.Errorf("List failed: %s", err) } else { @@ -237,40 +234,28 @@ func basictest(s store.Store, t *testing.T) { t.Errorf("Expiry options were not effective, results :%v", spew.Sdump(results)) } } - s.Write(&store.Record{Key: "a", Value: []byte("a")}) - s.Write(&store.Record{Key: "aa", Value: []byte("aa")}) - s.Write(&store.Record{Key: "aaa", Value: []byte("aaa")}) - if results, err := s.Read("b", store.ReadPrefix()); err != nil { - t.Error(err) - } else { - if len(results) != 0 { - t.Errorf("Expected 0 results, got %d", len(results)) - } - } - s.Init() - if err := s.(*fileStore).deleteAll(); err != nil { - t.Logf("Can't delete all: %v", err) - } + // write the following records for i := 0; i < 10; i++ { s.Write(&store.Record{ Key: fmt.Sprintf("a%d", i), Value: []byte{}, }) } + + // read back a few records if results, err := s.Read("a", store.ReadLimit(5), store.ReadPrefix()); err != nil { t.Error(err) } else { if len(results) != 5 { t.Error("Expected 5 results, got ", len(results)) } - if results[0].Key != "a0" { - t.Errorf("Expected a0, got %s", results[0].Key) - } - if results[4].Key != "a4" { - t.Errorf("Expected a4, got %s", results[4].Key) + if !strings.HasPrefix(results[0].Key, "a") { + t.Errorf("Expected a prefix, got %s", results[0].Key) } } + + // read the rest back if results, err := s.Read("a", store.ReadLimit(30), store.ReadOffset(5), store.ReadPrefix()); err != nil { t.Error(err) } else {