package file import ( "bytes" "io" "io/ioutil" "os" "path/filepath" "time" "github.com/micro/go-micro/v3/store" bolt "go.etcd.io/bbolt" ) // NewBlobStore returns a blob file store func NewBlobStore() (store.BlobStore, error) { // ensure the parent directory exists os.MkdirAll(DefaultDir, 0700) return &blobStore{}, nil } type blobStore struct{} func (b *blobStore) db() (*bolt.DB, error) { dbPath := filepath.Join(DefaultDir, "blob.db") return bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) } func (b *blobStore) Read(key string, opts ...store.BlobOption) (io.Reader, error) { // validate the key if len(key) == 0 { return nil, store.ErrMissingKey } // parse the options var options store.BlobOptions for _, o := range opts { o(&options) } if len(options.Namespace) == 0 { options.Namespace = "micro" } // open a connection to the database db, err := b.db() if err != nil { return nil, err } defer db.Close() // execute the transaction var value []byte readValue := func(tx *bolt.Tx) error { // check for the namespaces bucket bucket := tx.Bucket([]byte(options.Namespace)) if bucket == nil { return store.ErrNotFound } // look for the blob within the bucket res := bucket.Get([]byte(key)) if res == nil { return store.ErrNotFound } // the res object is only valid for the duration of the blot transaction, see: // https://github.com/golang/go/issues/33047 value = make([]byte, len(res)) copy(value, res) return nil } if err := db.View(readValue); err != nil { return nil, err } // return the blob return bytes.NewBuffer(value), nil } func (b *blobStore) Write(key string, blob io.Reader, opts ...store.BlobOption) error { // validate the key if len(key) == 0 { return store.ErrMissingKey } // parse the options var options store.BlobOptions for _, o := range opts { o(&options) } if len(options.Namespace) == 0 { options.Namespace = "micro" } // open a connection to the database db, err := b.db() if err != nil { return err } defer db.Close() // execute the transaction return db.Update(func(tx *bolt.Tx) error { // create the bucket bucket, err := tx.CreateBucketIfNotExists([]byte(options.Namespace)) if err != nil { return err } // write to the bucket value, err := ioutil.ReadAll(blob) if err != nil { return err } return bucket.Put([]byte(key), value) }) } func (b *blobStore) Delete(key string, opts ...store.BlobOption) error { // validate the key if len(key) == 0 { return store.ErrMissingKey } // parse the options var options store.BlobOptions for _, o := range opts { o(&options) } if len(options.Namespace) == 0 { options.Namespace = "micro" } // open a connection to the database db, err := b.db() if err != nil { return err } defer db.Close() // execute the transaction return db.Update(func(tx *bolt.Tx) error { // check for the namespaces bucket bucket := tx.Bucket([]byte(options.Namespace)) if bucket == nil { return nil } return bucket.Delete([]byte(key)) }) }