store/file: don't keep connection to boltdb open (#2006)
This commit is contained in:
parent
c1d09f7c41
commit
99ba8dd7d4
43
blob.go
43
blob.go
@ -9,7 +9,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-micro/v3/store"
|
"github.com/micro/go-micro/v3/store"
|
||||||
"github.com/pkg/errors"
|
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -18,19 +17,14 @@ func NewBlobStore() (store.BlobStore, error) {
|
|||||||
// ensure the parent directory exists
|
// ensure the parent directory exists
|
||||||
os.MkdirAll(DefaultDir, 0700)
|
os.MkdirAll(DefaultDir, 0700)
|
||||||
|
|
||||||
// open the connection to the database
|
return &blobStore{}, nil
|
||||||
dbPath := filepath.Join(DefaultDir, "micro.db")
|
|
||||||
db, err := bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second})
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "Error connecting to database")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the blob store
|
type blobStore struct{}
|
||||||
return &blobStore{db}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type blobStore struct {
|
func (b *blobStore) db() (*bolt.DB, error) {
|
||||||
db *bolt.DB
|
dbPath := filepath.Join(DefaultDir, "blob.db")
|
||||||
|
return bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *blobStore) Read(key string, opts ...store.BlobOption) (io.Reader, error) {
|
func (b *blobStore) Read(key string, opts ...store.BlobOption) (io.Reader, error) {
|
||||||
@ -48,6 +42,13 @@ func (b *blobStore) Read(key string, opts ...store.BlobOption) (io.Reader, error
|
|||||||
options.Namespace = "micro"
|
options.Namespace = "micro"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// open a connection to the database
|
||||||
|
db, err := b.db()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
// execute the transaction
|
// execute the transaction
|
||||||
var value []byte
|
var value []byte
|
||||||
readValue := func(tx *bolt.Tx) error {
|
readValue := func(tx *bolt.Tx) error {
|
||||||
@ -65,7 +66,7 @@ func (b *blobStore) Read(key string, opts ...store.BlobOption) (io.Reader, error
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := b.db.View(readValue); err != nil {
|
if err := db.View(readValue); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,8 +89,15 @@ func (b *blobStore) Write(key string, blob io.Reader, opts ...store.BlobOption)
|
|||||||
options.Namespace = "micro"
|
options.Namespace = "micro"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// open a connection to the database
|
||||||
|
db, err := b.db()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
// execute the transaction
|
// execute the transaction
|
||||||
return b.db.Update(func(tx *bolt.Tx) error {
|
return db.Update(func(tx *bolt.Tx) error {
|
||||||
// create the bucket
|
// create the bucket
|
||||||
bucket, err := tx.CreateBucketIfNotExists([]byte(options.Namespace))
|
bucket, err := tx.CreateBucketIfNotExists([]byte(options.Namespace))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -121,8 +129,15 @@ func (b *blobStore) Delete(key string, opts ...store.BlobOption) error {
|
|||||||
options.Namespace = "micro"
|
options.Namespace = "micro"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// open a connection to the database
|
||||||
|
db, err := b.db()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
// execute the transaction
|
// execute the transaction
|
||||||
return b.db.Update(func(tx *bolt.Tx) error {
|
return db.Update(func(tx *bolt.Tx) error {
|
||||||
// check for the namespaces bucket
|
// check for the namespaces bucket
|
||||||
bucket := tx.Bucket([]byte(options.Namespace))
|
bucket := tx.Bucket([]byte(options.Namespace))
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user