155 lines
3.0 KiB
Go
155 lines
3.0 KiB
Go
package file
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/v3/store"
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
// NewBlobStore returns a blob file store
|
|
func NewBlobStore() (store.BlobStore, error) {
|
|
// ensure the parent directory exists
|
|
os.MkdirAll(DefaultDir, 0700)
|
|
|
|
return &blobStore{}, nil
|
|
}
|
|
|
|
type blobStore struct{}
|
|
|
|
func (b *blobStore) db() (*bolt.DB, error) {
|
|
dbPath := filepath.Join(DefaultDir, "blob.db")
|
|
return bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second})
|
|
}
|
|
|
|
func (b *blobStore) Read(key string, opts ...store.BlobOption) (io.Reader, error) {
|
|
// validate the key
|
|
if len(key) == 0 {
|
|
return nil, store.ErrMissingKey
|
|
}
|
|
|
|
// parse the options
|
|
var options store.BlobOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
if len(options.Namespace) == 0 {
|
|
options.Namespace = "micro"
|
|
}
|
|
|
|
// open a connection to the database
|
|
db, err := b.db()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer db.Close()
|
|
|
|
// execute the transaction
|
|
var value []byte
|
|
readValue := func(tx *bolt.Tx) error {
|
|
// check for the namespaces bucket
|
|
bucket := tx.Bucket([]byte(options.Namespace))
|
|
if bucket == nil {
|
|
return store.ErrNotFound
|
|
}
|
|
|
|
// look for the blob within the bucket
|
|
res := bucket.Get([]byte(key))
|
|
if res == nil {
|
|
return store.ErrNotFound
|
|
}
|
|
|
|
// the res object is only valid for the duration of the blot transaction, see:
|
|
// https://github.com/golang/go/issues/33047
|
|
value = make([]byte, len(res))
|
|
copy(value, res)
|
|
|
|
return nil
|
|
}
|
|
if err := db.View(readValue); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// return the blob
|
|
return bytes.NewBuffer(value), nil
|
|
}
|
|
|
|
func (b *blobStore) Write(key string, blob io.Reader, opts ...store.BlobOption) error {
|
|
// validate the key
|
|
if len(key) == 0 {
|
|
return store.ErrMissingKey
|
|
}
|
|
|
|
// parse the options
|
|
var options store.BlobOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
if len(options.Namespace) == 0 {
|
|
options.Namespace = "micro"
|
|
}
|
|
|
|
// open a connection to the database
|
|
db, err := b.db()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.Close()
|
|
|
|
// execute the transaction
|
|
return db.Update(func(tx *bolt.Tx) error {
|
|
// create the bucket
|
|
bucket, err := tx.CreateBucketIfNotExists([]byte(options.Namespace))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// write to the bucket
|
|
value, err := ioutil.ReadAll(blob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return bucket.Put([]byte(key), value)
|
|
})
|
|
}
|
|
|
|
func (b *blobStore) Delete(key string, opts ...store.BlobOption) error {
|
|
// validate the key
|
|
if len(key) == 0 {
|
|
return store.ErrMissingKey
|
|
}
|
|
|
|
// parse the options
|
|
var options store.BlobOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
if len(options.Namespace) == 0 {
|
|
options.Namespace = "micro"
|
|
}
|
|
|
|
// open a connection to the database
|
|
db, err := b.db()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.Close()
|
|
|
|
// execute the transaction
|
|
return db.Update(func(tx *bolt.Tx) error {
|
|
// check for the namespaces bucket
|
|
bucket := tx.Bucket([]byte(options.Namespace))
|
|
if bucket == nil {
|
|
return nil
|
|
}
|
|
|
|
return bucket.Delete([]byte(key))
|
|
})
|
|
}
|