From e036c9c9a86dc98a03ef04b6a47376d49ae66475 Mon Sep 17 00:00:00 2001 From: Janos Dobronszki Date: Tue, 14 Jul 2020 10:35:46 +0200 Subject: [PATCH] =?UTF-8?q?Let=20bolt=20do=20locking=20per=20each=20List?= =?UTF-8?q?=20Get=20etc=20op,=20instead=20of=20managing=20fil=E2=80=A6=20(?= =?UTF-8?q?#1831)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- file.go | 86 +++++++++++++++++---------------------------------------- 1 file changed, 25 insertions(+), 61 deletions(-) diff --git a/file.go b/file.go index e685628..36f553b 100644 --- a/file.go +++ b/file.go @@ -7,7 +7,6 @@ import ( "path/filepath" "sort" "strings" - "sync" "time" "github.com/micro/go-micro/v2/store" @@ -29,9 +28,7 @@ var ( // NewStore returns a memory store func NewStore(opts ...store.Option) store.Store { - s := &fileStore{ - handles: make(map[string]*fileHandle), - } + s := &fileStore{} s.init(opts...) return s } @@ -39,10 +36,6 @@ func NewStore(opts ...store.Option) store.Store { type fileStore struct { options store.Options dir string - - // the database handle - sync.RWMutex - handles map[string]*fileHandle } type fileHandle struct { @@ -62,8 +55,8 @@ func key(database, table string) string { return database + ":" + table } -func (m *fileStore) delete(fd *fileHandle, key string) error { - return fd.db.Update(func(tx *bolt.Tx) error { +func (m *fileStore) delete(db *bolt.DB, key string) error { + return db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataBucket)) if b == nil { return nil @@ -96,7 +89,7 @@ func (m *fileStore) init(opts ...store.Option) error { return nil } -func (f *fileStore) getDB(database, table string) (*fileHandle, error) { +func (f *fileStore) getDB(database, table string) (*bolt.DB, error) { if len(database) == 0 { database = f.options.Database } @@ -104,23 +97,6 @@ func (f *fileStore) getDB(database, table string) (*fileHandle, error) { table = f.options.Table } - k := key(database, table) - f.RLock() - fd, ok := f.handles[k] - f.RUnlock() - - // return the file handle - if ok { - return fd, nil - } - - // double check locking - f.Lock() - defer f.Unlock() - if fd, ok := f.handles[k]; ok { - return fd, nil - } - // create a directory /tmp/micro dir := filepath.Join(DefaultDir, database) // create the database handle @@ -132,23 +108,13 @@ func (f *fileStore) getDB(database, table string) (*fileHandle, error) { // create new db handle // Bolt DB only allows one process to open the file R/W so make sure we're doing this under a lock - db, err := bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) - if err != nil { - return nil, err - } - fd = &fileHandle{ - key: k, - db: db, - } - f.handles[k] = fd - - return fd, nil + return bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) } -func (m *fileStore) list(fd *fileHandle, limit, offset uint) []string { +func (m *fileStore) list(db *bolt.DB, limit, offset uint) []string { var allItems []string - fd.db.View(func(tx *bolt.Tx) error { + db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataBucket)) // nothing to read if b == nil { @@ -199,10 +165,10 @@ func (m *fileStore) list(fd *fileHandle, limit, offset uint) []string { return allKeys } -func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) { +func (m *fileStore) get(db *bolt.DB, k string) (*store.Record, error) { var value []byte - fd.db.View(func(tx *bolt.Tx) error { + db.View(func(tx *bolt.Tx) error { // @todo this is still very experimental... b := tx.Bucket([]byte(dataBucket)) if b == nil { @@ -242,7 +208,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) { return newRecord, nil } -func (m *fileStore) set(fd *fileHandle, r *store.Record) error { +func (m *fileStore) set(db *bolt.DB, r *store.Record) error { // copy the incoming record and then // convert the expiry in to a hard timestamp item := &record{} @@ -261,7 +227,7 @@ func (m *fileStore) set(fd *fileHandle, r *store.Record) error { // marshal the data data, _ := json.Marshal(item) - return fd.db.Update(func(tx *bolt.Tx) error { + return db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataBucket)) if b == nil { var err error @@ -275,12 +241,6 @@ func (m *fileStore) set(fd *fileHandle, r *store.Record) error { } func (f *fileStore) Close() error { - f.Lock() - defer f.Unlock() - for k, v := range f.handles { - v.db.Close() - delete(f.handles, k) - } return nil } @@ -294,12 +254,13 @@ func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error { o(&deleteOptions) } - fd, err := m.getDB(deleteOptions.Database, deleteOptions.Table) + db, err := m.getDB(deleteOptions.Database, deleteOptions.Table) if err != nil { return err } + defer db.Close() - return m.delete(fd, key) + return m.delete(db, key) } func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { @@ -308,10 +269,11 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, o(&readOpts) } - fd, err := m.getDB(readOpts.Database, readOpts.Table) + db, err := m.getDB(readOpts.Database, readOpts.Table) if err != nil { return nil, err } + defer db.Close() var keys []string @@ -319,7 +281,7 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, // TODO: do range scan here rather than listing all keys if readOpts.Prefix || readOpts.Suffix { // list the keys - k := m.list(fd, readOpts.Limit, readOpts.Offset) + k := m.list(db, readOpts.Limit, readOpts.Offset) // check for prefix and suffix for _, v := range k { @@ -338,7 +300,7 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, var results []*store.Record for _, k := range keys { - r, err := m.get(fd, k) + r, err := m.get(db, k) if err != nil { return results, err } @@ -354,10 +316,11 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { o(&writeOpts) } - fd, err := m.getDB(writeOpts.Database, writeOpts.Table) + db, err := m.getDB(writeOpts.Database, writeOpts.Table) if err != nil { return err } + defer db.Close() if len(opts) > 0 { // Copy the record before applying options, or the incoming record will be mutated @@ -378,10 +341,10 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { newRecord.Metadata[k] = v } - return m.set(fd, &newRecord) + return m.set(db, &newRecord) } - return m.set(fd, r) + return m.set(db, r) } func (m *fileStore) Options() store.Options { @@ -395,13 +358,14 @@ func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { o(&listOptions) } - fd, err := m.getDB(listOptions.Database, listOptions.Table) + db, err := m.getDB(listOptions.Database, listOptions.Table) if err != nil { return nil, err } + defer db.Close() // TODO apply prefix/suffix in range query - allKeys := m.list(fd, listOptions.Limit, listOptions.Offset) + allKeys := m.list(db, listOptions.Limit, listOptions.Offset) if len(listOptions.Prefix) > 0 { var prefixKeys []string