300
s3.go
Normal file
300
s3.go
Normal file
@@ -0,0 +1,300 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/gabriel-vasile/mimetype"
|
||||
"github.com/minio/minio-go/v7"
|
||||
creds "github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/unistack-org/micro/v3/store"
|
||||
)
|
||||
|
||||
var keyRegex = regexp.MustCompile("[^a-zA-Z0-9]+")
|
||||
|
||||
type s3Store struct {
|
||||
client *minio.Client
|
||||
opts store.Options
|
||||
endpoint string
|
||||
mopts *minio.Options
|
||||
}
|
||||
|
||||
func getBucket(ctx context.Context) string {
|
||||
if ctx != nil {
|
||||
if v, ok := ctx.Value(bucketKey{}).(string); ok && v != "" {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// NewStore create new s3 store
|
||||
func NewStore(opts ...store.Option) store.Store {
|
||||
options := store.NewOptions(opts...)
|
||||
return &s3Store{opts: options, mopts: &minio.Options{}}
|
||||
}
|
||||
|
||||
func (s *s3Store) Connect(ctx context.Context) error {
|
||||
if s.client == nil {
|
||||
client, err := minio.New(s.endpoint, s.mopts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error connecting to store: %w", err)
|
||||
}
|
||||
|
||||
s.client = client
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *s3Store) Disconnect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *s3Store) Init(opts ...store.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&s.opts)
|
||||
}
|
||||
|
||||
var akey, skey string
|
||||
var endpoint string
|
||||
|
||||
if s.opts.Context != nil {
|
||||
if v, ok := s.opts.Context.Value(accessKey{}).(string); ok && v != "" {
|
||||
akey = v
|
||||
}
|
||||
if v, ok := s.opts.Context.Value(secretKey{}).(string); ok && v != "" {
|
||||
skey = v
|
||||
}
|
||||
if v, ok := s.opts.Context.Value(endpointKey{}).(string); ok && v != "" {
|
||||
endpoint = v
|
||||
}
|
||||
}
|
||||
|
||||
if len(akey) > 0 && len(skey) > 0 {
|
||||
s.mopts.Creds = creds.NewStaticV2(akey, skey, "")
|
||||
}
|
||||
|
||||
if len(endpoint) == 0 {
|
||||
return fmt.Errorf("missing Endpoint option")
|
||||
}
|
||||
|
||||
var secure bool
|
||||
|
||||
if strings.HasPrefix(endpoint, "https://") || s.opts.TLSConfig != nil {
|
||||
secure = true
|
||||
}
|
||||
|
||||
if u, err := url.Parse(endpoint); err == nil && u.Host != "" {
|
||||
endpoint = u.Host
|
||||
}
|
||||
|
||||
ts, err := minio.DefaultTransport(secure)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init error: %w", err)
|
||||
}
|
||||
|
||||
if s.opts.TLSConfig != nil {
|
||||
ts.TLSClientConfig = s.opts.TLSConfig
|
||||
}
|
||||
s.mopts.Transport = ts
|
||||
|
||||
s.endpoint = endpoint
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *s3Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
|
||||
options := store.NewReadOptions(opts...)
|
||||
|
||||
if len(key) == 0 {
|
||||
return store.ErrInvalidKey
|
||||
}
|
||||
|
||||
bucket := getBucket(options.Context)
|
||||
if bucket == "" {
|
||||
bucket = options.Namespace
|
||||
}
|
||||
|
||||
key = keyRegex.ReplaceAllString(key, "-")
|
||||
|
||||
rsp, err := s.client.GetObject(ctx, bucket, key, minio.GetObjectOptions{})
|
||||
if verr, ok := err.(minio.ErrorResponse); ok && verr.StatusCode == http.StatusNotFound {
|
||||
return store.ErrNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = rsp.Stat()
|
||||
if verr, ok := err.(minio.ErrorResponse); ok && verr.StatusCode == http.StatusNotFound {
|
||||
return store.ErrNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch val.(type) {
|
||||
case io.ReadCloser:
|
||||
val = rsp
|
||||
}
|
||||
|
||||
defer rsp.Close()
|
||||
buf, err := ioutil.ReadAll(rsp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.opts.Codec.Unmarshal(buf, val)
|
||||
}
|
||||
|
||||
func (s *s3Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
|
||||
options := store.NewWriteOptions(opts...)
|
||||
var r io.Reader
|
||||
|
||||
if len(key) == 0 {
|
||||
return store.ErrInvalidKey
|
||||
}
|
||||
|
||||
switch v := val.(type) {
|
||||
case io.Reader:
|
||||
r = v
|
||||
case []byte:
|
||||
r = bytes.NewReader(v)
|
||||
default:
|
||||
buf, err := s.opts.Codec.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r = bytes.NewReader(buf)
|
||||
}
|
||||
|
||||
key = keyRegex.ReplaceAllString(key, "-")
|
||||
|
||||
bucket := getBucket(options.Context)
|
||||
if bucket == "" {
|
||||
bucket = options.Namespace
|
||||
}
|
||||
|
||||
var region string
|
||||
mputopts := minio.PutObjectOptions{}
|
||||
|
||||
if v, ok := s.opts.Context.Value(regionKey{}).(string); ok && v != "" {
|
||||
region = v
|
||||
}
|
||||
|
||||
if options.Context != nil {
|
||||
if v, ok := options.Context.Value(contentTypeKey{}).(string); ok && v != "" {
|
||||
mputopts.ContentType = v
|
||||
}
|
||||
}
|
||||
|
||||
// slow path
|
||||
if mputopts.ContentType == "" {
|
||||
w := bytes.NewBuffer(nil)
|
||||
tr := io.TeeReader(r, w)
|
||||
mime, _ := mimetype.DetectReader(tr)
|
||||
mputopts.ContentType = mime.String()
|
||||
r = io.MultiReader(w, r)
|
||||
}
|
||||
|
||||
if ok, err := s.client.BucketExists(ctx, bucket); err != nil {
|
||||
return err
|
||||
} else if !ok {
|
||||
opts := minio.MakeBucketOptions{Region: region}
|
||||
if err := s.client.MakeBucket(ctx, bucket, opts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
_, err := s.client.PutObject(ctx, bucket, key, r, int64(-1), mputopts)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *s3Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
|
||||
// validate the key
|
||||
if len(key) == 0 {
|
||||
return store.ErrInvalidKey
|
||||
}
|
||||
|
||||
// make the key safe for use with s3
|
||||
key = keyRegex.ReplaceAllString(key, "-")
|
||||
|
||||
options := store.NewDeleteOptions(opts...)
|
||||
|
||||
bucket := getBucket(options.Context)
|
||||
if bucket == "" {
|
||||
bucket = options.Namespace
|
||||
}
|
||||
|
||||
if len(bucket) > 0 {
|
||||
return s.client.RemoveObject(
|
||||
ctx, // context
|
||||
bucket, // bucket name
|
||||
filepath.Join(options.Namespace, key), // object name
|
||||
minio.RemoveObjectOptions{}, // options
|
||||
)
|
||||
}
|
||||
|
||||
return s.client.RemoveObject(
|
||||
ctx, // context
|
||||
options.Namespace, // bucket name
|
||||
key, // object name
|
||||
minio.RemoveObjectOptions{}, // options
|
||||
)
|
||||
}
|
||||
|
||||
func (s *s3Store) Options() store.Options {
|
||||
return s.opts
|
||||
}
|
||||
|
||||
func (s *s3Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
||||
options := store.NewExistsOptions(opts...)
|
||||
|
||||
bucket := getBucket(options.Context)
|
||||
if bucket == "" {
|
||||
bucket = options.Namespace
|
||||
}
|
||||
|
||||
_, err := s.client.StatObject(ctx, bucket, key, minio.StatObjectOptions{})
|
||||
if verr, ok := err.(minio.ErrorResponse); ok && verr.StatusCode == http.StatusNotFound {
|
||||
return store.ErrNotFound
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *s3Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
|
||||
options := store.NewListOptions(opts...)
|
||||
|
||||
bucket := getBucket(options.Context)
|
||||
if bucket == "" {
|
||||
bucket = options.Namespace
|
||||
}
|
||||
|
||||
var recursive bool
|
||||
if options.Context != nil {
|
||||
if v, ok := options.Context.Value(listRecursiveKey{}).(bool); ok {
|
||||
recursive = v
|
||||
}
|
||||
}
|
||||
|
||||
var names []string
|
||||
for oinfo := range s.client.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: options.Prefix, Recursive: recursive}) {
|
||||
names = append(names, oinfo.Key)
|
||||
}
|
||||
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func (s *s3Store) String() string {
|
||||
return "s3"
|
||||
}
|
||||
Reference in New Issue
Block a user