Let bolt do locking per each List Get etc op, instead of managing fil… (#1831)
This commit is contained in:
		| @@ -7,7 +7,6 @@ import ( | |||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" |  | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/micro/go-micro/v2/store" | 	"github.com/micro/go-micro/v2/store" | ||||||
| @@ -29,9 +28,7 @@ var ( | |||||||
|  |  | ||||||
| // NewStore returns a memory store | // NewStore returns a memory store | ||||||
| func NewStore(opts ...store.Option) store.Store { | func NewStore(opts ...store.Option) store.Store { | ||||||
| 	s := &fileStore{ | 	s := &fileStore{} | ||||||
| 		handles: make(map[string]*fileHandle), |  | ||||||
| 	} |  | ||||||
| 	s.init(opts...) | 	s.init(opts...) | ||||||
| 	return s | 	return s | ||||||
| } | } | ||||||
| @@ -39,10 +36,6 @@ func NewStore(opts ...store.Option) store.Store { | |||||||
| type fileStore struct { | type fileStore struct { | ||||||
| 	options store.Options | 	options store.Options | ||||||
| 	dir     string | 	dir     string | ||||||
|  |  | ||||||
| 	// the database handle |  | ||||||
| 	sync.RWMutex |  | ||||||
| 	handles map[string]*fileHandle |  | ||||||
| } | } | ||||||
|  |  | ||||||
| type fileHandle struct { | type fileHandle struct { | ||||||
| @@ -62,8 +55,8 @@ func key(database, table string) string { | |||||||
| 	return database + ":" + table | 	return database + ":" + table | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *fileStore) delete(fd *fileHandle, key string) error { | func (m *fileStore) delete(db *bolt.DB, key string) error { | ||||||
| 	return fd.db.Update(func(tx *bolt.Tx) error { | 	return db.Update(func(tx *bolt.Tx) error { | ||||||
| 		b := tx.Bucket([]byte(dataBucket)) | 		b := tx.Bucket([]byte(dataBucket)) | ||||||
| 		if b == nil { | 		if b == nil { | ||||||
| 			return nil | 			return nil | ||||||
| @@ -96,7 +89,7 @@ func (m *fileStore) init(opts ...store.Option) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (f *fileStore) getDB(database, table string) (*fileHandle, error) { | func (f *fileStore) getDB(database, table string) (*bolt.DB, error) { | ||||||
| 	if len(database) == 0 { | 	if len(database) == 0 { | ||||||
| 		database = f.options.Database | 		database = f.options.Database | ||||||
| 	} | 	} | ||||||
| @@ -104,23 +97,6 @@ func (f *fileStore) getDB(database, table string) (*fileHandle, error) { | |||||||
| 		table = f.options.Table | 		table = f.options.Table | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	k := key(database, table) |  | ||||||
| 	f.RLock() |  | ||||||
| 	fd, ok := f.handles[k] |  | ||||||
| 	f.RUnlock() |  | ||||||
|  |  | ||||||
| 	// return the file handle |  | ||||||
| 	if ok { |  | ||||||
| 		return fd, nil |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// double check locking |  | ||||||
| 	f.Lock() |  | ||||||
| 	defer f.Unlock() |  | ||||||
| 	if fd, ok := f.handles[k]; ok { |  | ||||||
| 		return fd, nil |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// create a directory /tmp/micro | 	// create a directory /tmp/micro | ||||||
| 	dir := filepath.Join(DefaultDir, database) | 	dir := filepath.Join(DefaultDir, database) | ||||||
| 	// create the database handle | 	// create the database handle | ||||||
| @@ -132,23 +108,13 @@ func (f *fileStore) getDB(database, table string) (*fileHandle, error) { | |||||||
|  |  | ||||||
| 	// create new db handle | 	// create new db handle | ||||||
| 	// Bolt DB only allows one process to open the file R/W so make sure we're doing this under a lock | 	// Bolt DB only allows one process to open the file R/W so make sure we're doing this under a lock | ||||||
| 	db, err := bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) | 	return bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	fd = &fileHandle{ |  | ||||||
| 		key: k, |  | ||||||
| 		db:  db, |  | ||||||
| 	} |  | ||||||
| 	f.handles[k] = fd |  | ||||||
|  |  | ||||||
| 	return fd, nil |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *fileStore) list(fd *fileHandle, limit, offset uint) []string { | func (m *fileStore) list(db *bolt.DB, limit, offset uint) []string { | ||||||
| 	var allItems []string | 	var allItems []string | ||||||
|  |  | ||||||
| 	fd.db.View(func(tx *bolt.Tx) error { | 	db.View(func(tx *bolt.Tx) error { | ||||||
| 		b := tx.Bucket([]byte(dataBucket)) | 		b := tx.Bucket([]byte(dataBucket)) | ||||||
| 		// nothing to read | 		// nothing to read | ||||||
| 		if b == nil { | 		if b == nil { | ||||||
| @@ -199,10 +165,10 @@ func (m *fileStore) list(fd *fileHandle, limit, offset uint) []string { | |||||||
| 	return allKeys | 	return allKeys | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) { | func (m *fileStore) get(db *bolt.DB, k string) (*store.Record, error) { | ||||||
| 	var value []byte | 	var value []byte | ||||||
|  |  | ||||||
| 	fd.db.View(func(tx *bolt.Tx) error { | 	db.View(func(tx *bolt.Tx) error { | ||||||
| 		// @todo this is still very experimental... | 		// @todo this is still very experimental... | ||||||
| 		b := tx.Bucket([]byte(dataBucket)) | 		b := tx.Bucket([]byte(dataBucket)) | ||||||
| 		if b == nil { | 		if b == nil { | ||||||
| @@ -242,7 +208,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) { | |||||||
| 	return newRecord, nil | 	return newRecord, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *fileStore) set(fd *fileHandle, r *store.Record) error { | func (m *fileStore) set(db *bolt.DB, r *store.Record) error { | ||||||
| 	// copy the incoming record and then | 	// copy the incoming record and then | ||||||
| 	// convert the expiry in to a hard timestamp | 	// convert the expiry in to a hard timestamp | ||||||
| 	item := &record{} | 	item := &record{} | ||||||
| @@ -261,7 +227,7 @@ func (m *fileStore) set(fd *fileHandle, r *store.Record) error { | |||||||
| 	// marshal the data | 	// marshal the data | ||||||
| 	data, _ := json.Marshal(item) | 	data, _ := json.Marshal(item) | ||||||
|  |  | ||||||
| 	return fd.db.Update(func(tx *bolt.Tx) error { | 	return db.Update(func(tx *bolt.Tx) error { | ||||||
| 		b := tx.Bucket([]byte(dataBucket)) | 		b := tx.Bucket([]byte(dataBucket)) | ||||||
| 		if b == nil { | 		if b == nil { | ||||||
| 			var err error | 			var err error | ||||||
| @@ -275,12 +241,6 @@ func (m *fileStore) set(fd *fileHandle, r *store.Record) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (f *fileStore) Close() error { | func (f *fileStore) Close() error { | ||||||
| 	f.Lock() |  | ||||||
| 	defer f.Unlock() |  | ||||||
| 	for k, v := range f.handles { |  | ||||||
| 		v.db.Close() |  | ||||||
| 		delete(f.handles, k) |  | ||||||
| 	} |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -294,12 +254,13 @@ func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error { | |||||||
| 		o(&deleteOptions) | 		o(&deleteOptions) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fd, err := m.getDB(deleteOptions.Database, deleteOptions.Table) | 	db, err := m.getDB(deleteOptions.Database, deleteOptions.Table) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
| 	return m.delete(fd, key) | 	return m.delete(db, key) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { | func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { | ||||||
| @@ -308,10 +269,11 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, | |||||||
| 		o(&readOpts) | 		o(&readOpts) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fd, err := m.getDB(readOpts.Database, readOpts.Table) | 	db, err := m.getDB(readOpts.Database, readOpts.Table) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
| 	var keys []string | 	var keys []string | ||||||
|  |  | ||||||
| @@ -319,7 +281,7 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, | |||||||
| 	// TODO: do range scan here rather than listing all keys | 	// TODO: do range scan here rather than listing all keys | ||||||
| 	if readOpts.Prefix || readOpts.Suffix { | 	if readOpts.Prefix || readOpts.Suffix { | ||||||
| 		// list the keys | 		// list the keys | ||||||
| 		k := m.list(fd, readOpts.Limit, readOpts.Offset) | 		k := m.list(db, readOpts.Limit, readOpts.Offset) | ||||||
|  |  | ||||||
| 		// check for prefix and suffix | 		// check for prefix and suffix | ||||||
| 		for _, v := range k { | 		for _, v := range k { | ||||||
| @@ -338,7 +300,7 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, | |||||||
| 	var results []*store.Record | 	var results []*store.Record | ||||||
|  |  | ||||||
| 	for _, k := range keys { | 	for _, k := range keys { | ||||||
| 		r, err := m.get(fd, k) | 		r, err := m.get(db, k) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return results, err | 			return results, err | ||||||
| 		} | 		} | ||||||
| @@ -354,10 +316,11 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { | |||||||
| 		o(&writeOpts) | 		o(&writeOpts) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fd, err := m.getDB(writeOpts.Database, writeOpts.Table) | 	db, err := m.getDB(writeOpts.Database, writeOpts.Table) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
| 	if len(opts) > 0 { | 	if len(opts) > 0 { | ||||||
| 		// Copy the record before applying options, or the incoming record will be mutated | 		// Copy the record before applying options, or the incoming record will be mutated | ||||||
| @@ -378,10 +341,10 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { | |||||||
| 			newRecord.Metadata[k] = v | 			newRecord.Metadata[k] = v | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		return m.set(fd, &newRecord) | 		return m.set(db, &newRecord) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return m.set(fd, r) | 	return m.set(db, r) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (m *fileStore) Options() store.Options { | func (m *fileStore) Options() store.Options { | ||||||
| @@ -395,13 +358,14 @@ func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { | |||||||
| 		o(&listOptions) | 		o(&listOptions) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fd, err := m.getDB(listOptions.Database, listOptions.Table) | 	db, err := m.getDB(listOptions.Database, listOptions.Table) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	defer db.Close() | ||||||
|  |  | ||||||
| 	// TODO apply prefix/suffix in range query | 	// TODO apply prefix/suffix in range query | ||||||
| 	allKeys := m.list(fd, listOptions.Limit, listOptions.Offset) | 	allKeys := m.list(db, listOptions.Limit, listOptions.Offset) | ||||||
|  |  | ||||||
| 	if len(listOptions.Prefix) > 0 { | 	if len(listOptions.Prefix) > 0 { | ||||||
| 		var prefixKeys []string | 		var prefixKeys []string | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user