Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
83694864ad | |||
0e452f4dc7 | |||
560cb01564 | |||
6efc858dbd | |||
392db01d40 | |||
b1a3401b9b |
3
go.mod
3
go.mod
@@ -4,6 +4,7 @@ go 1.15
|
||||
|
||||
require (
|
||||
github.com/gabriel-vasile/mimetype v1.1.2
|
||||
github.com/google/uuid v1.1.5
|
||||
github.com/minio/minio-go/v7 v7.0.7
|
||||
github.com/unistack-org/micro/v3 v3.1.8
|
||||
github.com/unistack-org/micro/v3 v3.2.6
|
||||
)
|
||||
|
6
go.sum
6
go.sum
@@ -37,6 +37,7 @@ github.com/google/uuid v1.1.5 h1:kxhtnfFVi+rYdOALN0B3k9UT86zVJKfBimRaciULW4I=
|
||||
github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA=
|
||||
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
|
||||
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
@@ -89,8 +90,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/unistack-org/micro/v3 v3.1.8 h1:ma9tBX7h0QyByDZ+vrY3+kqJoy/Ro9vMx2iTLjrDaNI=
|
||||
github.com/unistack-org/micro/v3 v3.1.8/go.mod h1:J8XxJj4Pqa3Ee0a4biRRtut7UwTlfBq8QRe+s4PKGS0=
|
||||
github.com/unistack-org/micro/v3 v3.2.6 h1:LfgoC86oF5tgsB5JVe8L0yF4swG4FGH0fVBP+FrG6fw=
|
||||
github.com/unistack-org/micro/v3 v3.2.6/go.mod h1:J8XxJj4Pqa3Ee0a4biRRtut7UwTlfBq8QRe+s4PKGS0=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
@@ -167,6 +168,7 @@ gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww=
|
||||
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
91
s3.go
91
s3.go
@@ -21,10 +21,11 @@ import (
|
||||
var keyRegex = regexp.MustCompile("[^a-zA-Z0-9]+")
|
||||
|
||||
type s3Store struct {
|
||||
client *minio.Client
|
||||
opts store.Options
|
||||
endpoint string
|
||||
mopts *minio.Options
|
||||
client *minio.Client
|
||||
opts store.Options
|
||||
endpoint string
|
||||
mopts *minio.Options
|
||||
connected bool
|
||||
}
|
||||
|
||||
func getBucket(ctx context.Context) string {
|
||||
@@ -43,15 +44,18 @@ func NewStore(opts ...store.Option) store.Store {
|
||||
}
|
||||
|
||||
func (s *s3Store) Connect(ctx context.Context) error {
|
||||
if s.client == nil {
|
||||
client, err := minio.New(s.endpoint, s.mopts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error connecting to store: %w", err)
|
||||
}
|
||||
|
||||
s.client = client
|
||||
if s.connected {
|
||||
return nil
|
||||
}
|
||||
|
||||
client, err := minio.New(s.endpoint, s.mopts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error connecting to store: %w", err)
|
||||
}
|
||||
|
||||
s.client = client
|
||||
s.connected = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -65,7 +69,8 @@ func (s *s3Store) Init(opts ...store.Option) error {
|
||||
}
|
||||
|
||||
var akey, skey string
|
||||
var endpoint string
|
||||
region := "us-east-1"
|
||||
endpoint := s.endpoint
|
||||
|
||||
if s.opts.Context != nil {
|
||||
if v, ok := s.opts.Context.Value(accessKey{}).(string); ok && v != "" {
|
||||
@@ -76,38 +81,39 @@ func (s *s3Store) Init(opts ...store.Option) error {
|
||||
}
|
||||
if v, ok := s.opts.Context.Value(endpointKey{}).(string); ok && v != "" {
|
||||
endpoint = v
|
||||
var secure bool
|
||||
|
||||
if strings.HasPrefix(endpoint, "https://") || s.opts.TLSConfig != nil {
|
||||
secure = true
|
||||
}
|
||||
|
||||
if u, err := url.Parse(endpoint); err == nil && u.Host != "" {
|
||||
endpoint = u.Host
|
||||
}
|
||||
|
||||
ts, err := minio.DefaultTransport(secure)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init error: %w", err)
|
||||
}
|
||||
if s.opts.TLSConfig != nil {
|
||||
ts.TLSClientConfig = s.opts.TLSConfig
|
||||
}
|
||||
|
||||
s.mopts.Transport = ts
|
||||
s.mopts.Secure = secure
|
||||
s.endpoint = endpoint
|
||||
}
|
||||
if v, ok := s.opts.Context.Value(regionKey{}).(string); ok && v != "" {
|
||||
region = v
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if len(akey) > 0 && len(skey) > 0 {
|
||||
s.mopts.Creds = creds.NewStaticV2(akey, skey, "")
|
||||
}
|
||||
|
||||
if len(endpoint) == 0 {
|
||||
return fmt.Errorf("missing Endpoint option")
|
||||
}
|
||||
|
||||
var secure bool
|
||||
|
||||
if strings.HasPrefix(endpoint, "https://") || s.opts.TLSConfig != nil {
|
||||
secure = true
|
||||
}
|
||||
|
||||
if u, err := url.Parse(endpoint); err == nil && u.Host != "" {
|
||||
endpoint = u.Host
|
||||
}
|
||||
|
||||
ts, err := minio.DefaultTransport(secure)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init error: %w", err)
|
||||
}
|
||||
|
||||
if s.opts.TLSConfig != nil {
|
||||
ts.TLSClientConfig = s.opts.TLSConfig
|
||||
}
|
||||
s.mopts.Transport = ts
|
||||
|
||||
s.endpoint = endpoint
|
||||
s.mopts.Region = region
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -182,13 +188,8 @@ func (s *s3Store) Write(ctx context.Context, key string, val interface{}, opts .
|
||||
bucket = options.Namespace
|
||||
}
|
||||
|
||||
var region string
|
||||
mputopts := minio.PutObjectOptions{}
|
||||
|
||||
if v, ok := s.opts.Context.Value(regionKey{}).(string); ok && v != "" {
|
||||
region = v
|
||||
}
|
||||
|
||||
if options.Context != nil {
|
||||
if v, ok := options.Context.Value(contentTypeKey{}).(string); ok && v != "" {
|
||||
mputopts.ContentType = v
|
||||
@@ -207,7 +208,7 @@ func (s *s3Store) Write(ctx context.Context, key string, val interface{}, opts .
|
||||
if ok, err := s.client.BucketExists(ctx, bucket); err != nil {
|
||||
return err
|
||||
} else if !ok {
|
||||
opts := minio.MakeBucketOptions{Region: region}
|
||||
opts := minio.MakeBucketOptions{Region: s.mopts.Region}
|
||||
if err := s.client.MakeBucket(ctx, bucket, opts); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -298,3 +299,7 @@ func (s *s3Store) List(ctx context.Context, opts ...store.ListOption) ([]string,
|
||||
func (s *s3Store) String() string {
|
||||
return "s3"
|
||||
}
|
||||
|
||||
func (s *s3Store) Name() string {
|
||||
return s.opts.Name
|
||||
}
|
||||
|
47
s3_test.go
47
s3_test.go
@@ -5,6 +5,10 @@ import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/unistack-org/micro/v3"
|
||||
)
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
@@ -24,35 +28,49 @@ func TestStore(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.Init(); err != nil {
|
||||
t.Fatalf("double init test failed: %v", err)
|
||||
}
|
||||
|
||||
if err := s.Connect(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.Connect(ctx); err != nil {
|
||||
t.Fatalf("double connect test failed: %v", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := s.Disconnect(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
svc := micro.NewService(micro.Stores(s))
|
||||
if err := svc.Init(); err != nil {
|
||||
t.Fatalf("service init failed: %v", err)
|
||||
}
|
||||
|
||||
val := []byte("test")
|
||||
key := "key"
|
||||
|
||||
if err := s.Write(ctx, key, val, WriteBucket("micro-store-s3"), ContentType("text/plain")); err != nil {
|
||||
bucket := "micro-store-s3-" + uuid.New().String()
|
||||
if err := s.Write(ctx, key, val, WriteBucket(bucket), ContentType("text/plain")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
val = nil
|
||||
|
||||
if err := s.Exists(ctx, key, ExistsBucket("micro-store-s3")); err != nil {
|
||||
if err := s.Exists(ctx, key, ExistsBucket(bucket)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := s.Read(ctx, key, &val, ReadBucket("micro-store-s3")); err != nil {
|
||||
if err := s.Read(ctx, key, &val, ReadBucket(bucket)); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !bytes.Equal(val, []byte("test")) {
|
||||
t.Fatalf("read bytes are not equal %s != %s", val, "test")
|
||||
}
|
||||
|
||||
names, err := s.List(ctx, ListBucket("micro-store-s3"))
|
||||
names, err := s.List(ctx, ListBucket(bucket))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -68,4 +86,25 @@ func TestStore(t *testing.T) {
|
||||
t.Fatalf("key not found in %v", names)
|
||||
}
|
||||
|
||||
objectsCh := make(chan minio.ObjectInfo)
|
||||
minioClient := s.(*s3Store).client
|
||||
// Send object names that are needed to be removed to objectsCh
|
||||
go func() {
|
||||
defer close(objectsCh)
|
||||
// List all objects from a bucket-name with a matching prefix.
|
||||
for object := range minioClient.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Recursive: true}) {
|
||||
if object.Err != nil {
|
||||
t.Fatal(object.Err)
|
||||
}
|
||||
objectsCh <- object
|
||||
}
|
||||
}()
|
||||
|
||||
opts := minio.RemoveObjectsOptions{
|
||||
GovernanceBypass: true,
|
||||
}
|
||||
|
||||
for rErr := range minioClient.RemoveObjects(context.Background(), bucket, objectsCh, opts) {
|
||||
t.Fatalf("Error detected during deletion: %v", rErr)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user