remove store s3 plugin
This commit is contained in:
@@ -1,52 +0,0 @@
|
|||||||
package s3
|
|
||||||
|
|
||||||
import "crypto/tls"
|
|
||||||
|
|
||||||
// Options used to configure the s3 blob store
|
|
||||||
type Options struct {
|
|
||||||
Endpoint string
|
|
||||||
Region string
|
|
||||||
AccessKeyID string
|
|
||||||
SecretAccessKey string
|
|
||||||
Secure bool
|
|
||||||
TLSConfig *tls.Config
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option configures one or more options
|
|
||||||
type Option func(o *Options)
|
|
||||||
|
|
||||||
// Endpoint sets the endpoint option
|
|
||||||
func Endpoint(e string) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Endpoint = e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Region sets the region option
|
|
||||||
func Region(r string) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Region = r
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Credentials sets the AccessKeyID and SecretAccessKey options
|
|
||||||
func Credentials(id, secret string) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.AccessKeyID = id
|
|
||||||
o.SecretAccessKey = secret
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insecure sets the secure option to false. It is enabled by default.
|
|
||||||
func Insecure() Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Secure = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TLSConfig sets the tls config for the client
|
|
||||||
func TLSConfig(c *tls.Config) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.TLSConfig = c
|
|
||||||
}
|
|
||||||
}
|
|
174
store/s3/s3.go
174
store/s3/s3.go
@@ -1,174 +0,0 @@
|
|||||||
package s3
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"regexp"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/v3/store"
|
|
||||||
"github.com/minio/minio-go/v7"
|
|
||||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var keyRegex = regexp.MustCompile("[^a-zA-Z0-9]+")
|
|
||||||
|
|
||||||
// NewBlobStore returns an initialized s3 blob store
|
|
||||||
func NewBlobStore(opts ...Option) (store.BlobStore, error) {
|
|
||||||
// parse the options
|
|
||||||
options := Options{Secure: true}
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
minioOpts := &minio.Options{
|
|
||||||
Secure: options.Secure,
|
|
||||||
}
|
|
||||||
if len(options.AccessKeyID) > 0 || len(options.SecretAccessKey) > 0 {
|
|
||||||
minioOpts.Creds = credentials.NewStaticV4(options.AccessKeyID, options.SecretAccessKey, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
// configure the transport to use custom tls config if provided
|
|
||||||
if options.TLSConfig != nil {
|
|
||||||
ts, err := minio.DefaultTransport(options.Secure)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "Error setting up s3 blob store transport")
|
|
||||||
}
|
|
||||||
ts.TLSClientConfig = options.TLSConfig
|
|
||||||
minioOpts.Transport = ts
|
|
||||||
}
|
|
||||||
|
|
||||||
// initialize minio client
|
|
||||||
client, err := minio.New(options.Endpoint, minioOpts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "Error connecting to s3 blob store")
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the blob store
|
|
||||||
return &s3{client, &options}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type s3 struct {
|
|
||||||
client *minio.Client
|
|
||||||
options *Options
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *s3) Read(key string, opts ...store.BlobOption) (io.Reader, error) {
|
|
||||||
// validate the key
|
|
||||||
if len(key) == 0 {
|
|
||||||
return nil, store.ErrMissingKey
|
|
||||||
}
|
|
||||||
|
|
||||||
// make the key safe for use with s3
|
|
||||||
key = keyRegex.ReplaceAllString(key, "-")
|
|
||||||
|
|
||||||
// parse the options
|
|
||||||
var options store.BlobOptions
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
if len(options.Namespace) == 0 {
|
|
||||||
options.Namespace = "micro"
|
|
||||||
}
|
|
||||||
|
|
||||||
// lookup the object
|
|
||||||
res, err := s.client.GetObject(
|
|
||||||
context.TODO(), // context
|
|
||||||
options.Namespace, // bucket name
|
|
||||||
key, // object name
|
|
||||||
minio.GetObjectOptions{}, // options
|
|
||||||
)
|
|
||||||
|
|
||||||
// scaleway will return a 404 if the bucket doesn't exist
|
|
||||||
if verr, ok := err.(minio.ErrorResponse); ok && verr.StatusCode == http.StatusNotFound {
|
|
||||||
return nil, store.ErrNotFound
|
|
||||||
} else if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the object info, if an error is returned the object could not be found
|
|
||||||
_, err = res.Stat()
|
|
||||||
if verr, ok := err.(minio.ErrorResponse); ok && verr.StatusCode == http.StatusNotFound {
|
|
||||||
return nil, store.ErrNotFound
|
|
||||||
} else if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// return the result
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *s3) Write(key string, blob io.Reader, opts ...store.BlobOption) error {
|
|
||||||
// validate the key
|
|
||||||
if len(key) == 0 {
|
|
||||||
return store.ErrMissingKey
|
|
||||||
}
|
|
||||||
|
|
||||||
// make the key safe for use with s3
|
|
||||||
key = keyRegex.ReplaceAllString(key, "-")
|
|
||||||
|
|
||||||
// parse the options
|
|
||||||
var options store.BlobOptions
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
if len(options.Namespace) == 0 {
|
|
||||||
options.Namespace = "micro"
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the bucket exists, create it if not
|
|
||||||
if exists, err := s.client.BucketExists(context.TODO(), options.Namespace); err != nil {
|
|
||||||
return err
|
|
||||||
} else if !exists {
|
|
||||||
opts := minio.MakeBucketOptions{Region: s.options.Region}
|
|
||||||
if err := s.client.MakeBucket(context.TODO(), options.Namespace, opts); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the bytes so we can determine the length
|
|
||||||
b, err := ioutil.ReadAll(blob)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// create the object in the bucket
|
|
||||||
_, err = s.client.PutObject(
|
|
||||||
context.TODO(), // context
|
|
||||||
options.Namespace, // bucket name
|
|
||||||
key, // object name
|
|
||||||
bytes.NewBuffer(b), // reader
|
|
||||||
int64(len(b)), // length of object
|
|
||||||
minio.PutObjectOptions{}, // options
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *s3) Delete(key string, opts ...store.BlobOption) error {
|
|
||||||
// validate the key
|
|
||||||
if len(key) == 0 {
|
|
||||||
return store.ErrMissingKey
|
|
||||||
}
|
|
||||||
|
|
||||||
// make the key safe for use with s3
|
|
||||||
key = keyRegex.ReplaceAllString(key, "-")
|
|
||||||
|
|
||||||
// parse the options
|
|
||||||
var options store.BlobOptions
|
|
||||||
for _, o := range opts {
|
|
||||||
o(&options)
|
|
||||||
}
|
|
||||||
if len(options.Namespace) == 0 {
|
|
||||||
options.Namespace = "micro"
|
|
||||||
}
|
|
||||||
|
|
||||||
err := s.client.RemoveObject(
|
|
||||||
context.TODO(), // context
|
|
||||||
options.Namespace, // bucket name
|
|
||||||
key, // object name
|
|
||||||
minio.RemoveObjectOptions{}, // options
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
@@ -1,119 +0,0 @@
|
|||||||
package s3
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/v3/store"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestBlobStore(t *testing.T) {
|
|
||||||
region := os.Getenv("S3_BLOB_STORE_REGION")
|
|
||||||
if len(region) == 0 {
|
|
||||||
t.Skipf("Missing required config S3_BLOB_STORE_REGION")
|
|
||||||
}
|
|
||||||
|
|
||||||
endpoint := os.Getenv("S3_BLOB_STORE_ENDPOINT")
|
|
||||||
if len(endpoint) == 0 {
|
|
||||||
t.Skipf("Missing required config S3_BLOB_STORE_ENDPOINT")
|
|
||||||
}
|
|
||||||
|
|
||||||
accessKey := os.Getenv("S3_BLOB_STORE_ACCESS_KEY")
|
|
||||||
if len(accessKey) == 0 {
|
|
||||||
t.Skipf("Missing required config S3_BLOB_STORE_ACCESS_KEY")
|
|
||||||
}
|
|
||||||
|
|
||||||
secretKey := os.Getenv("S3_BLOB_STORE_SECRET_KEY")
|
|
||||||
if len(secretKey) == 0 {
|
|
||||||
t.Skipf("Missing required config S3_BLOB_STORE_SECRET_KEY")
|
|
||||||
}
|
|
||||||
|
|
||||||
blob, err := NewBlobStore(
|
|
||||||
Region(region),
|
|
||||||
Endpoint(endpoint),
|
|
||||||
Credentials(accessKey, secretKey),
|
|
||||||
)
|
|
||||||
assert.NotNilf(t, blob, "Blob should not be nil")
|
|
||||||
assert.Nilf(t, err, "Error should be nil")
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("ReadMissingKey", func(t *testing.T) {
|
|
||||||
res, err := blob.Read("")
|
|
||||||
assert.Equal(t, store.ErrMissingKey, err, "Error should be missing key")
|
|
||||||
assert.Nil(t, res, "Result should be nil")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("ReadNotFound", func(t *testing.T) {
|
|
||||||
res, err := blob.Read("foo")
|
|
||||||
assert.Equal(t, store.ErrNotFound, err, "Error should be not found")
|
|
||||||
assert.Nil(t, res, "Result should be nil")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("WriteMissingKey", func(t *testing.T) {
|
|
||||||
buf := bytes.NewBuffer([]byte("HelloWorld"))
|
|
||||||
err := blob.Write("", buf)
|
|
||||||
assert.Equal(t, store.ErrMissingKey, err, "Error should be missing key")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("WriteValid", func(t *testing.T) {
|
|
||||||
buf := bytes.NewBuffer([]byte("world"))
|
|
||||||
err := blob.Write("hello", buf)
|
|
||||||
assert.Nilf(t, err, "Error should be nil")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("ReadValid", func(t *testing.T) {
|
|
||||||
val, err := blob.Read("hello")
|
|
||||||
assert.Nilf(t, err, "Error should be nil")
|
|
||||||
assert.NotNilf(t, val, "Value should not be nil")
|
|
||||||
|
|
||||||
if val != nil {
|
|
||||||
bytes, err := ioutil.ReadAll(val)
|
|
||||||
assert.Nilf(t, err, "Error should be nil")
|
|
||||||
assert.Equal(t, "world", string(bytes), "Value should be world")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("ReadIncorrectNamespace", func(t *testing.T) {
|
|
||||||
val, err := blob.Read("hello", store.BlobNamespace("bar"))
|
|
||||||
assert.Equal(t, store.ErrNotFound, err, "Error should be not found")
|
|
||||||
assert.Nil(t, val, "Value should be nil")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("ReadCorrectNamespace", func(t *testing.T) {
|
|
||||||
val, err := blob.Read("hello", store.BlobNamespace("micro"))
|
|
||||||
assert.Nil(t, err, "Error should be nil")
|
|
||||||
assert.NotNilf(t, val, "Value should not be nil")
|
|
||||||
|
|
||||||
if val != nil {
|
|
||||||
bytes, err := ioutil.ReadAll(val)
|
|
||||||
assert.Nilf(t, err, "Error should be nil")
|
|
||||||
assert.Equal(t, "world", string(bytes), "Value should be world")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("DeleteIncorrectNamespace", func(t *testing.T) {
|
|
||||||
err := blob.Delete("hello", store.BlobNamespace("bar"))
|
|
||||||
assert.Nil(t, err, "Error should be nil")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("DeleteCorrectNamespaceIncorrectKey", func(t *testing.T) {
|
|
||||||
err := blob.Delete("world", store.BlobNamespace("micro"))
|
|
||||||
assert.Nil(t, err, "Error should be nil")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("DeleteCorrectNamespace", func(t *testing.T) {
|
|
||||||
err := blob.Delete("hello", store.BlobNamespace("micro"))
|
|
||||||
assert.Nil(t, err, "Error should be nil")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("ReadDeletedKey", func(t *testing.T) {
|
|
||||||
res, err := blob.Read("hello", store.BlobNamespace("micro"))
|
|
||||||
assert.Equal(t, store.ErrNotFound, err, "Error should be not found")
|
|
||||||
assert.Nil(t, res, "Result should be nil")
|
|
||||||
})
|
|
||||||
}
|
|
Reference in New Issue
Block a user