From e6aa5b967bbde042b4cb60a963228e69bdba72b1 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 10 Dec 2020 22:08:17 +0300 Subject: [PATCH] store: refactor interface Signed-off-by: Vasiliy Tolstov --- api/server/acme/certmagic/storage.go | 25 ++++++++--------------- store/noop.go | 11 +++++++--- store/options.go | 24 ++++++++++++++++++++-- store/store.go | 30 ++++++++++++++-------------- util/scope/scope.go | 10 +++++----- util/sync/manager.go | 10 +++------- util/sync/sync.go | 12 +++++++---- util/token/basic/basic.go | 13 ++++-------- 8 files changed, 73 insertions(+), 62 deletions(-) diff --git a/api/server/acme/certmagic/storage.go b/api/server/acme/certmagic/storage.go index 241b630e..32c07a04 100644 --- a/api/server/acme/certmagic/storage.go +++ b/api/server/acme/certmagic/storage.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/gob" "errors" - "fmt" "path" "strings" "time" @@ -48,25 +47,19 @@ func (s *storage) Store(key string, value []byte) error { if err := e.Encode(f); err != nil { return err } - r := &store.Record{ - Key: key, - Value: buf.Bytes(), - } - return s.store.Write(s.store.Options().Context, r) + return s.store.Write(s.store.Options().Context, key, buf.Bytes()) } func (s *storage) Load(key string) ([]byte, error) { if !s.Exists(key) { return nil, certmagic.ErrNotExist(errors.New(key + " doesn't exist")) } - records, err := s.store.Read(s.store.Options().Context, key) + var val []byte + err := s.store.Read(s.store.Options().Context, key, val) if err != nil { return nil, err } - if len(records) != 1 { - return nil, fmt.Errorf("ACME Storage: multiple records matched key %s", key) - } - b := bytes.NewBuffer(records[0].Value) + b := bytes.NewBuffer(val) d := gob.NewDecoder(b) var f File err = d.Decode(&f) @@ -81,7 +74,7 @@ func (s *storage) Delete(key string) error { } func (s *storage) Exists(key string) bool { - if _, err := s.store.Read(s.store.Options().Context, key); err != nil { + if err := s.store.Read(s.store.Options().Context, key, nil); err != nil { return false } return true @@ -116,14 +109,12 @@ func (s *storage) List(prefix string, recursive bool) ([]string, error) { } func (s *storage) Stat(key string) (certmagic.KeyInfo, error) { - records, err := s.store.Read(s.store.Options().Context, key) + var val []byte + err := s.store.Read(s.store.Options().Context, key, val) if err != nil { return certmagic.KeyInfo{}, err } - if len(records) != 1 { - return certmagic.KeyInfo{}, fmt.Errorf("ACME Storage: multiple records matched key %s", key) - } - b := bytes.NewBuffer(records[0].Value) + b := bytes.NewBuffer(val) d := gob.NewDecoder(b) var f File err = d.Decode(&f) diff --git a/store/noop.go b/store/noop.go index 0c89e599..711f3564 100644 --- a/store/noop.go +++ b/store/noop.go @@ -29,12 +29,17 @@ func (n *noopStore) String() string { } // Read reads store value by key -func (n *noopStore) Read(ctx context.Context, key string, opts ...ReadOption) ([]*Record, error) { - return []*Record{}, nil +func (n *noopStore) Exists(ctx context.Context, key string) error { + return ErrNotFound +} + +// Read reads store value by key +func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error { + return ErrNotFound } // Write writes store record -func (n *noopStore) Write(ctx context.Context, r *Record, opts ...WriteOption) error { +func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error { return nil } diff --git a/store/options.go b/store/options.go index 3eac5231..434a941b 100644 --- a/store/options.go +++ b/store/options.go @@ -4,7 +4,9 @@ import ( "context" "time" + "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" ) // Options contains configuration for the Store @@ -17,6 +19,8 @@ type Options struct { Database string // Table is analag for a table in database backends or a key prefix in KV backends Table string + // Codec that used for marshal/unmarshal value + Codec codec.Codec // Logger the logger Logger logger.Logger // Context should contain all implementation specific options, using context.WithValue. @@ -45,6 +49,13 @@ func Context(ctx context.Context) Option { } } +// Codec sets the codec +func Codec(c codec.Codec) Option { + return func(o *Options) { + o.Codec = c + } +} + // Logger sets the logger func Logger(l logger.Logger) Option { return func(o *Options) { @@ -130,11 +141,13 @@ func ReadOffset(o uint) ReadOption { // WriteOptions configures an individual Write operation // If Expiry and TTL are set TTL takes precedence type WriteOptions struct { - Database, Table string + Database string + Table string // Expiry is the time the record expires Expiry time.Time // TTL is the time until the record expires - TTL time.Duration + TTL time.Duration + Metadata metadata.Metadata } // WriteOption sets values in WriteOptions @@ -162,6 +175,13 @@ func WriteTTL(d time.Duration) WriteOption { } } +// WriteMetadata add metadata.Metadata +func WriteMetadata(md metadata.Metadata) WriteOption { + return func(w *WriteOptions) { + w.Metadata = metadata.Copy(md) + } +} + // DeleteOptions configures an individual Delete operation type DeleteOptions struct { Database, Table string diff --git a/store/store.go b/store/store.go index 3f6f231a..91a984f8 100644 --- a/store/store.go +++ b/store/store.go @@ -5,7 +5,8 @@ package store import ( "context" "errors" - "time" + + "github.com/unistack-org/micro/v3/metadata" ) var ( @@ -23,10 +24,12 @@ type Store interface { Connect(ctx context.Context) error // Options allows you to view the current options. Options() Options - // Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error. - Read(ctx context.Context, key string, opts ...ReadOption) ([]*Record, error) - // Write() writes a record to the store, and returns an error if the record was not written. - Write(ctx context.Context, r *Record, opts ...WriteOption) error + // Exists check that key exists in store + Exists(ctx context.Context, key string) error + // Read reads a single key name to provided value with optional ReadOptions + Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error + // Write writes a value to key name to the store with optional WriteOption + Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error // Delete removes the record with the corresponding key from the store. Delete(ctx context.Context, key string, opts ...DeleteOption) error // List returns any keys that match, or an empty list with no error if none matched. @@ -37,14 +40,11 @@ type Store interface { String() string } -// Record is an item stored or retrieved from a Store -type Record struct { - // The key to store the record - Key string `json:"key"` - // The value within the record - Value []byte `json:"value"` - // Any associated metadata for indexing - Metadata map[string]interface{} `json:"metadata"` - // Time to expire a record: TODO: change to timestamp - Expiry time.Duration `json:"expiry,omitempty"` +// Value is an item stored or retrieved from a Store +// may be used in store implementations to provide metadata +type Value struct { + // Data holds underline struct + Data interface{} `json:"data"` + // Metadata associated with data for indexing + Metadata metadata.Metadata `json:"metadata"` } diff --git a/util/scope/scope.go b/util/scope/scope.go index ad01334e..f66f3ced 100644 --- a/util/scope/scope.go +++ b/util/scope/scope.go @@ -24,14 +24,14 @@ func (s *Scope) Options() store.Options { return o } -func (s *Scope) Read(ctx context.Context, key string, opts ...store.ReadOption) ([]*store.Record, error) { +func (s *Scope) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { key = fmt.Sprintf("%v/%v", s.prefix, key) - return s.Store.Read(ctx, key, opts...) + return s.Store.Read(ctx, key, val, opts...) } -func (s *Scope) Write(ctx context.Context, r *store.Record, opts ...store.WriteOption) error { - r.Key = fmt.Sprintf("%v/%v", s.prefix, r.Key) - return s.Store.Write(ctx, r, opts...) +func (s *Scope) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { + key = fmt.Sprintf("%v/%v", s.prefix, key) + return s.Store.Write(ctx, key, val, opts...) } func (s *Scope) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error { diff --git a/util/sync/manager.go b/util/sync/manager.go index c9dc6938..f03f5a30 100644 --- a/util/sync/manager.go +++ b/util/sync/manager.go @@ -41,16 +41,12 @@ func (c *syncStore) processQueue(index int) { if !ir.expiresAt.IsZero() && time.Now().After(ir.expiresAt) { continue } - nr := &store.Record{ - Key: ir.key, - } - nr.Value = make([]byte, len(ir.value)) - copy(nr.Value, ir.value) + var opts []store.WriteOption if !ir.expiresAt.IsZero() { - nr.Expiry = time.Until(ir.expiresAt) + opts = append(opts, store.WriteTTL(ir.expiresAt.Sub(time.Now()))) } // Todo = internal queue also has to hold the corresponding store.WriteOptions - if err := c.syncOpts.Stores[index+1].Write(c.storeOpts.Context, nr); err != nil { + if err := c.syncOpts.Stores[index+1].Write(c.storeOpts.Context, ir.key, ir.value, opts...); err != nil { // some error, so queue for retry and bail q.PushBack(ir) return diff --git a/util/sync/sync.go b/util/sync/sync.go index 1667e6a5..ec20b95c 100644 --- a/util/sync/sync.go +++ b/util/sync/sync.go @@ -96,12 +96,16 @@ func (c *syncStore) List(ctx context.Context, opts ...store.ListOption) ([]strin return c.syncOpts.Stores[0].List(ctx, opts...) } -func (c *syncStore) Read(ctx context.Context, key string, opts ...store.ReadOption) ([]*store.Record, error) { - return c.syncOpts.Stores[0].Read(ctx, key, opts...) +func (c *syncStore) Exists(ctx context.Context, key string) error { + return c.syncOpts.Stores[0].Exists(ctx, key) } -func (c *syncStore) Write(ctx context.Context, r *store.Record, opts ...store.WriteOption) error { - return c.syncOpts.Stores[0].Write(ctx, r, opts...) +func (c *syncStore) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { + return c.syncOpts.Stores[0].Read(ctx, key, val, opts...) +} + +func (c *syncStore) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { + return c.syncOpts.Stores[0].Write(ctx, key, val, opts...) } // Delete removes a key from the sync diff --git a/util/token/basic/basic.go b/util/token/basic/basic.go index 77b89546..057ac72b 100644 --- a/util/token/basic/basic.go +++ b/util/token/basic/basic.go @@ -47,11 +47,7 @@ func (b *Basic) Generate(acc *auth.Account, opts ...token.GenerateOption) (*toke // write to the store key := uuid.New().String() - err = b.store.Write(context.Background(), &store.Record{ - Key: fmt.Sprintf("%v%v", StorePrefix, key), - Value: bytes, - Expiry: options.Expiry, - }) + err = b.store.Write(context.Background(), fmt.Sprintf("%v%v", StorePrefix, key), bytes, store.WriteTTL(options.Expiry)) if err != nil { return nil, err } @@ -67,17 +63,16 @@ func (b *Basic) Generate(acc *auth.Account, opts ...token.GenerateOption) (*toke // Inspect a token func (b *Basic) Inspect(t string) (*auth.Account, error) { // lookup the token in the store - recs, err := b.store.Read(context.Background(), StorePrefix+t) + var val []byte + err := b.store.Read(context.Background(), StorePrefix+t, val) if err == store.ErrNotFound { return nil, token.ErrInvalidToken } else if err != nil { return nil, err } - bytes := recs[0].Value - // unmarshal the bytes var acc *auth.Account - if err := json.Unmarshal(bytes, &acc); err != nil { + if err := json.Unmarshal(val, &acc); err != nil { return nil, err }