Merge remote-tracking branch 'rem/v3' into v3
Some checks failed
pipeline / test (pull_request) Failing after 44s
pipeline / lint (pull_request) Failing after 10m2s

This commit is contained in:
Aleksandr Tolstikhin
2024-12-06 03:36:33 +07:00
125 changed files with 3643 additions and 3301 deletions

View File

@@ -1,197 +0,0 @@
package store
import (
"context"
"sort"
"strings"
"time"
"github.com/patrickmn/go-cache"
)
// NewStore returns a memory store
func NewStore(opts ...Option) Store {
return &memoryStore{
opts: NewOptions(opts...),
store: cache.New(cache.NoExpiration, 5*time.Minute),
}
}
func (m *memoryStore) Connect(ctx context.Context) error {
return nil
}
func (m *memoryStore) Disconnect(ctx context.Context) error {
m.store.Flush()
return nil
}
type memoryStore struct {
store *cache.Cache
opts Options
}
func (m *memoryStore) key(prefix, key string) string {
return prefix + m.opts.Separator + key
}
func (m *memoryStore) exists(prefix, key string) error {
key = m.key(prefix, key)
_, found := m.store.Get(key)
if !found {
return ErrNotFound
}
return nil
}
func (m *memoryStore) get(prefix, key string, val interface{}) error {
key = m.key(prefix, key)
r, found := m.store.Get(key)
if !found {
return ErrNotFound
}
buf, ok := r.([]byte)
if !ok {
return ErrNotFound
}
return m.opts.Codec.Unmarshal(buf, val)
}
func (m *memoryStore) delete(prefix, key string) {
key = m.key(prefix, key)
m.store.Delete(key)
}
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
allItems := m.store.Items()
allKeys := make([]string, 0, len(allItems))
for k := range allItems {
if !strings.HasPrefix(k, prefix) {
continue
}
k = strings.TrimPrefix(k, prefix)
if k[0] == '/' {
k = k[1:]
}
allKeys = append(allKeys, k)
}
if limit != 0 || offset != 0 {
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
end := len(allKeys)
if limit > 0 {
calcLimit := int(offset + limit)
if calcLimit < end {
end = calcLimit
}
}
if int(offset) >= end {
return nil
}
return allKeys[offset:end]
}
return allKeys
}
func (m *memoryStore) Init(opts ...Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *memoryStore) String() string {
return "memory"
}
func (m *memoryStore) Name() string {
return m.opts.Name
}
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
options := NewExistsOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
return m.exists(options.Namespace, key)
}
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
options := NewReadOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
return m.get(options.Namespace, key, val)
}
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
options := NewWriteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
if options.TTL == 0 {
options.TTL = cache.NoExpiration
}
key = m.key(options.Namespace, key)
buf, err := m.opts.Codec.Marshal(val)
if err != nil {
return err
}
m.store.Set(key, buf, options.TTL)
return nil
}
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
options := NewDeleteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
m.delete(options.Namespace, key)
return nil
}
func (m *memoryStore) Options() Options {
return m.opts
}
func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
options := NewListOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
keys := m.list(options.Namespace, options.Limit, options.Offset)
if len(options.Prefix) > 0 {
var prefixKeys []string
for _, k := range keys {
if strings.HasPrefix(k, options.Prefix) {
prefixKeys = append(prefixKeys, k)
}
}
keys = prefixKeys
}
if len(options.Suffix) > 0 {
var suffixKeys []string
for _, k := range keys {
if strings.HasSuffix(k, options.Suffix) {
suffixKeys = append(suffixKeys, k)
}
}
keys = suffixKeys
}
return keys, nil
}

306
store/memory/memory.go Normal file
View File

@@ -0,0 +1,306 @@
package memory
import (
"context"
"sort"
"strings"
"sync/atomic"
"time"
cache "github.com/patrickmn/go-cache"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/store"
)
// NewStore returns a memory store
func NewStore(opts ...store.Option) store.Store {
return &memoryStore{
opts: store.NewOptions(opts...),
store: cache.New(cache.NoExpiration, 5*time.Minute),
}
}
func (m *memoryStore) Connect(ctx context.Context) error {
if m.opts.LazyConnect {
return nil
}
return m.connect(ctx)
}
func (m *memoryStore) Disconnect(ctx context.Context) error {
m.store.Flush()
return nil
}
type memoryStore struct {
funcRead store.FuncRead
funcWrite store.FuncWrite
funcExists store.FuncExists
funcList store.FuncList
funcDelete store.FuncDelete
store *cache.Cache
opts store.Options
isConnected atomic.Int32
}
func (m *memoryStore) key(prefix, key string) string {
return prefix + m.opts.Separator + key
}
func (m *memoryStore) exists(prefix, key string) error {
key = m.key(prefix, key)
_, found := m.store.Get(key)
if !found {
return store.ErrNotFound
}
return nil
}
func (m *memoryStore) get(prefix, key string, val interface{}) error {
key = m.key(prefix, key)
r, found := m.store.Get(key)
if !found {
return store.ErrNotFound
}
buf, ok := r.([]byte)
if !ok {
return store.ErrNotFound
}
return m.opts.Codec.Unmarshal(buf, val)
}
func (m *memoryStore) delete(prefix, key string) {
key = m.key(prefix, key)
m.store.Delete(key)
}
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
allItems := m.store.Items()
allKeys := make([]string, 0, len(allItems))
for k := range allItems {
if !strings.HasPrefix(k, prefix) {
continue
}
k = strings.TrimPrefix(k, prefix)
if k[0] == '/' {
k = k[1:]
}
allKeys = append(allKeys, k)
}
if limit != 0 || offset != 0 {
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] })
end := len(allKeys)
if limit > 0 {
calcLimit := int(offset + limit)
if calcLimit < end {
end = calcLimit
}
}
if int(offset) >= end {
return nil
}
return allKeys[offset:end]
}
return allKeys
}
func (m *memoryStore) Init(opts ...store.Option) error {
for _, o := range opts {
o(&m.opts)
}
m.funcRead = m.fnRead
m.funcWrite = m.fnWrite
m.funcExists = m.fnExists
m.funcList = m.fnList
m.funcDelete = m.fnDelete
m.opts.Hooks.EachNext(func(hook options.Hook) {
switch h := hook.(type) {
case store.HookRead:
m.funcRead = h(m.funcRead)
case store.HookWrite:
m.funcWrite = h(m.funcWrite)
case store.HookExists:
m.funcExists = h(m.funcExists)
case store.HookList:
m.funcList = h(m.funcList)
case store.HookDelete:
m.funcDelete = h(m.funcDelete)
}
})
return nil
}
func (m *memoryStore) String() string {
return "memory"
}
func (m *memoryStore) Name() string {
return m.opts.Name
}
func (m *memoryStore) Live() bool {
return true
}
func (m *memoryStore) Ready() bool {
return true
}
func (m *memoryStore) Health() bool {
return true
}
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
if m.opts.LazyConnect {
if err := m.connect(ctx); err != nil {
return err
}
}
return m.funcExists(ctx, key, opts...)
}
func (m *memoryStore) fnExists(ctx context.Context, key string, opts ...store.ExistsOption) error {
options := store.NewExistsOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
return m.exists(options.Namespace, key)
}
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
if m.opts.LazyConnect {
if err := m.connect(ctx); err != nil {
return err
}
}
return m.funcRead(ctx, key, val, opts...)
}
func (m *memoryStore) fnRead(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
options := store.NewReadOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
return m.get(options.Namespace, key, val)
}
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
if m.opts.LazyConnect {
if err := m.connect(ctx); err != nil {
return err
}
}
return m.funcWrite(ctx, key, val, opts...)
}
func (m *memoryStore) fnWrite(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
options := store.NewWriteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
if options.TTL == 0 {
options.TTL = cache.NoExpiration
}
key = m.key(options.Namespace, key)
buf, err := m.opts.Codec.Marshal(val)
if err != nil {
return err
}
m.store.Set(key, buf, options.TTL)
return nil
}
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
if m.opts.LazyConnect {
if err := m.connect(ctx); err != nil {
return err
}
}
return m.funcDelete(ctx, key, opts...)
}
func (m *memoryStore) fnDelete(ctx context.Context, key string, opts ...store.DeleteOption) error {
options := store.NewDeleteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
m.delete(options.Namespace, key)
return nil
}
func (m *memoryStore) Options() store.Options {
return m.opts
}
func (m *memoryStore) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
if m.opts.LazyConnect {
if err := m.connect(ctx); err != nil {
return nil, err
}
}
return m.funcList(ctx, opts...)
}
func (m *memoryStore) fnList(ctx context.Context, opts ...store.ListOption) ([]string, error) {
options := store.NewListOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
keys := m.list(options.Namespace, options.Limit, options.Offset)
if len(options.Prefix) > 0 {
var prefixKeys []string
for _, k := range keys {
if strings.HasPrefix(k, options.Prefix) {
prefixKeys = append(prefixKeys, k)
}
}
keys = prefixKeys
}
if len(options.Suffix) > 0 {
var suffixKeys []string
for _, k := range keys {
if strings.HasSuffix(k, options.Suffix) {
suffixKeys = append(suffixKeys, k)
}
}
keys = suffixKeys
}
return keys, nil
}
func (m *memoryStore) connect(ctx context.Context) error {
m.isConnected.CompareAndSwap(0, 1)
return nil
}
func (m *memoryStore) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) {
return &watcher{}, nil
}
type watcher struct{}
func (w *watcher) Next() (store.Event, error) {
return nil, nil
}
func (w *watcher) Stop() {
}

View File

@@ -1,4 +1,4 @@
package store_test
package memory
import (
"context"
@@ -8,8 +8,41 @@ import (
"go.unistack.org/micro/v3/store"
)
type testHook struct {
f bool
}
func (t *testHook) Exists(fn store.FuncExists) store.FuncExists {
return func(ctx context.Context, key string, opts ...store.ExistsOption) error {
t.f = true
return fn(ctx, key, opts...)
}
}
func TestHook(t *testing.T) {
h := &testHook{}
s := NewStore(store.Hooks(store.HookExists(h.Exists)))
if err := s.Init(); err != nil {
t.Fatal(err)
}
if err := s.Write(context.TODO(), "test", nil); err != nil {
t.Error(err)
}
if err := s.Exists(context.TODO(), "test"); err != nil {
t.Fatal(err)
}
if !h.f {
t.Fatal("hook not works")
}
}
func TestMemoryReInit(t *testing.T) {
s := store.NewStore(store.Namespace("aaa"))
s := NewStore(store.Namespace("aaa"))
if err := s.Init(store.Namespace("")); err != nil {
t.Fatal(err)
}
@@ -19,7 +52,7 @@ func TestMemoryReInit(t *testing.T) {
}
func TestMemoryBasic(t *testing.T) {
s := store.NewStore()
s := NewStore()
if err := s.Init(); err != nil {
t.Fatal(err)
}
@@ -27,7 +60,7 @@ func TestMemoryBasic(t *testing.T) {
}
func TestMemoryPrefix(t *testing.T) {
s := store.NewStore()
s := NewStore()
if err := s.Init(store.Namespace("some-prefix")); err != nil {
t.Fatal(err)
}
@@ -35,7 +68,7 @@ func TestMemoryPrefix(t *testing.T) {
}
func TestMemoryNamespace(t *testing.T) {
s := store.NewStore()
s := NewStore()
if err := s.Init(store.Namespace("some-namespace")); err != nil {
t.Fatal(err)
}
@@ -43,7 +76,7 @@ func TestMemoryNamespace(t *testing.T) {
}
func TestMemoryNamespacePrefix(t *testing.T) {
s := store.NewStore()
s := NewStore()
if err := s.Init(store.Namespace("some-namespace")); err != nil {
t.Fatal(err)
}

238
store/noop.go Normal file
View File

@@ -0,0 +1,238 @@
package store
import (
"context"
"sync"
"sync/atomic"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/util/id"
)
var _ Store = (*noopStore)(nil)
type noopStore struct {
mu sync.Mutex
watchers map[string]Watcher
funcRead FuncRead
funcWrite FuncWrite
funcExists FuncExists
funcList FuncList
funcDelete FuncDelete
opts Options
isConnected atomic.Int32
}
func (n *noopStore) Live() bool {
return true
}
func (n *noopStore) Ready() bool {
return true
}
func (n *noopStore) Health() bool {
return true
}
func NewStore(opts ...Option) *noopStore {
options := NewOptions(opts...)
return &noopStore{opts: options}
}
func (n *noopStore) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
n.funcRead = n.fnRead
n.funcWrite = n.fnWrite
n.funcExists = n.fnExists
n.funcList = n.fnList
n.funcDelete = n.fnDelete
n.opts.Hooks.EachNext(func(hook options.Hook) {
switch h := hook.(type) {
case HookRead:
n.funcRead = h(n.funcRead)
case HookWrite:
n.funcWrite = h(n.funcWrite)
case HookExists:
n.funcExists = h(n.funcExists)
case HookList:
n.funcList = h(n.funcList)
case HookDelete:
n.funcDelete = h(n.funcDelete)
}
})
return nil
}
func (n *noopStore) Connect(ctx context.Context) error {
if n.opts.LazyConnect {
return nil
}
return n.connect(ctx)
}
func (n *noopStore) Disconnect(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
if n.opts.LazyConnect {
if err := n.connect(ctx); err != nil {
return err
}
}
return n.funcRead(ctx, key, val, opts...)
}
func (n *noopStore) fnRead(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
if n.opts.LazyConnect {
if err := n.connect(ctx); err != nil {
return err
}
}
return n.funcDelete(ctx, key, opts...)
}
func (n *noopStore) fnDelete(ctx context.Context, key string, opts ...DeleteOption) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
func (n *noopStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
if n.opts.LazyConnect {
if err := n.connect(ctx); err != nil {
return err
}
}
return n.funcExists(ctx, key, opts...)
}
func (n *noopStore) fnExists(ctx context.Context, key string, opts ...ExistsOption) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
if n.opts.LazyConnect {
if err := n.connect(ctx); err != nil {
return err
}
}
return n.funcWrite(ctx, key, val, opts...)
}
func (n *noopStore) fnWrite(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
func (n *noopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
if n.opts.LazyConnect {
if err := n.connect(ctx); err != nil {
return nil, err
}
}
return n.funcList(ctx, opts...)
}
func (n *noopStore) fnList(ctx context.Context, opts ...ListOption) ([]string, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return nil, nil
}
func (n *noopStore) Name() string {
return n.opts.Name
}
func (n *noopStore) String() string {
return "noop"
}
func (n *noopStore) Options() Options {
return n.opts
}
func (n *noopStore) connect(ctx context.Context) error {
if n.isConnected.CompareAndSwap(0, 1) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
}
type watcher struct {
exit chan bool
id string
ch chan Event
opts WatchOptions
}
func (m *noopStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
id, err := id.New()
if err != nil {
return nil, err
}
wo, err := NewWatchOptions(opts...)
if err != nil {
return nil, err
}
// construct the watcher
w := &watcher{
exit: make(chan bool),
ch: make(chan Event),
id: id,
opts: wo,
}
m.mu.Lock()
m.watchers[w.id] = w
m.mu.Unlock()
return w, nil
}
func (w *watcher) Next() (Event, error) {
return nil, nil
}
func (w *watcher) Stop() {
}

35
store/noop_test.go Normal file
View File

@@ -0,0 +1,35 @@
package store
import (
"context"
"testing"
)
type testHook struct {
f bool
}
func (t *testHook) Exists(fn FuncExists) FuncExists {
return func(ctx context.Context, key string, opts ...ExistsOption) error {
t.f = true
return fn(ctx, key, opts...)
}
}
func TestHook(t *testing.T) {
h := &testHook{}
s := NewStore(Hooks(HookExists(h.Exists)))
if err := s.Init(); err != nil {
t.Fatal(err)
}
if err := s.Exists(context.TODO(), "test"); err != nil {
t.Fatal(err)
}
if !h.f {
t.Fatal("hook not works")
}
}

View File

@@ -9,6 +9,7 @@ import (
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/tracer"
)
@@ -38,6 +39,10 @@ type Options struct {
// Wrappers []Wrapper
// Timeout specifies timeout duration for all operations
Timeout time.Duration
// Hooks can be run before/after store Read/List/Write/Exists/Delete
Hooks options.Hooks
// LazyConnect creates a connection when using store
LazyConnect bool
}
// NewOptions creates options struct
@@ -129,6 +134,13 @@ func Timeout(td time.Duration) Option {
}
}
// LazyConnect initialize connection only when needed
func LazyConnect(b bool) Option {
return func(o *Options) {
o.LazyConnect = b
}
}
// Addrs contains the addresses or other connection information of the backing storage.
// For example, an etcd implementation would contain the nodes of the cluster.
// A SQL implementation could contain one or more connection strings.
@@ -144,6 +156,10 @@ type ReadOptions struct {
Context context.Context
// Namespace holds namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// NewReadOptions fills ReadOptions struct with opts slice
@@ -158,6 +174,20 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
// ReadOption sets values in ReadOptions
type ReadOption func(r *ReadOptions)
// ReadTimeout pass timeout to ReadOptions
func ReadTimeout(td time.Duration) ReadOption {
return func(o *ReadOptions) {
o.Timeout = td
}
}
// ReadName pass name to ReadOptions
func ReadName(name string) ReadOption {
return func(o *ReadOptions) {
o.Name = name
}
}
// ReadContext pass context.Context to ReadOptions
func ReadContext(ctx context.Context) ReadOption {
return func(o *ReadOptions) {
@@ -180,6 +210,10 @@ type WriteOptions struct {
Metadata metadata.Metadata
// Namespace holds namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
// TTL specifies key TTL
TTL time.Duration
}
@@ -224,12 +258,30 @@ func WriteNamespace(ns string) WriteOption {
}
}
// WriteName pass name to WriteOptions
func WriteName(name string) WriteOption {
return func(o *WriteOptions) {
o.Name = name
}
}
// WriteTimeout pass timeout to WriteOptions
func WriteTimeout(td time.Duration) WriteOption {
return func(o *WriteOptions) {
o.Timeout = td
}
}
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
// Context holds external options
Context context.Context
// Namespace holds namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// NewDeleteOptions fills DeleteOptions struct with opts slice
@@ -258,14 +310,32 @@ func DeleteNamespace(ns string) DeleteOption {
}
}
// DeleteName pass name to DeleteOptions
func DeleteName(name string) DeleteOption {
return func(o *DeleteOptions) {
o.Name = name
}
}
// DeleteTimeout pass timeout to DeleteOptions
func DeleteTimeout(td time.Duration) DeleteOption {
return func(o *DeleteOptions) {
o.Timeout = td
}
}
// ListOptions configures an individual List operation
type ListOptions struct {
Context context.Context
Prefix string
Suffix string
Namespace string
Limit uint
Offset uint
// Name holds mnemonic name
Name string
Limit uint
Offset uint
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// NewListOptions fills ListOptions struct with opts slice
@@ -322,12 +392,23 @@ func ListNamespace(ns string) ListOption {
}
}
// ListTimeout pass timeout to ListOptions
func ListTimeout(td time.Duration) ListOption {
return func(o *ListOptions) {
o.Timeout = td
}
}
// ExistsOptions holds options for Exists method
type ExistsOptions struct {
// Context holds external options
Context context.Context
// Namespace contains namespace
Namespace string
// Name holds mnemonic name
Name string
// Timeout specifies max timeout for operation
Timeout time.Duration
}
// ExistsOption specifies Exists call options
@@ -358,11 +439,23 @@ func ExistsNamespace(ns string) ExistsOption {
}
}
/*
// WrapStore adds a store Wrapper to a list of options passed into the store
func WrapStore(w Wrapper) Option {
return func(o *Options) {
o.Wrappers = append(o.Wrappers, w)
// ExistsName pass name to exist options
func ExistsName(name string) ExistsOption {
return func(o *ExistsOptions) {
o.Name = name
}
}
// ExistsTimeout timeout to ListOptions
func ExistsTimeout(td time.Duration) ExistsOption {
return func(o *ExistsOptions) {
o.Timeout = td
}
}
// Hooks sets hook runs before action
func Hooks(h ...options.Hook) Option {
return func(o *Options) {
o.Hooks = append(o.Hooks, h...)
}
}
*/

View File

@@ -1,9 +1,10 @@
// Package store is an interface for distributed data storage.
package store // import "go.unistack.org/micro/v3/store"
package store
import (
"context"
"errors"
"time"
)
type EventType int
@@ -15,6 +16,9 @@ const (
)
var (
ErrWatcherStopped = errors.New("watcher stopped")
// ErrNotConnected is returned when a store is not connected
ErrNotConnected = errors.New("not conected")
// ErrNotFound is returned when a key doesn't exist
ErrNotFound = errors.New("not found")
// ErrInvalidKey is returned when a key has empty or have invalid format
@@ -32,6 +36,7 @@ type Event interface {
// Store is a data storage interface
type Store interface {
// Name returns store name
Name() string
// Init initialises the store
Init(opts ...Option) error
@@ -53,6 +58,65 @@ type Store interface {
Disconnect(ctx context.Context) error
// String returns the name of the implementation.
String() string
// Watch returns events watcher
Watch(ctx context.Context, opts ...WatchOption) (Watcher, error)
// Live returns store liveness
Live() bool
// Ready returns store readiness
Ready() bool
// Health returns store health
Health() bool
}
type (
FuncExists func(ctx context.Context, key string, opts ...ExistsOption) error
HookExists func(next FuncExists) FuncExists
FuncRead func(ctx context.Context, key string, val interface{}, opts ...ReadOption) error
HookRead func(next FuncRead) FuncRead
FuncWrite func(ctx context.Context, key string, val interface{}, opts ...WriteOption) error
HookWrite func(next FuncWrite) FuncWrite
FuncDelete func(ctx context.Context, key string, opts ...DeleteOption) error
HookDelete func(next FuncDelete) FuncDelete
FuncList func(ctx context.Context, opts ...ListOption) ([]string, error)
HookList func(next FuncList) FuncList
)
type EventType int
const (
EventTypeUnknown = iota
EventTypeConnect
EventTypeDisconnect
EventTypeOpError
)
type Event interface {
Timestamp() time.Time
Error() error
Type() EventType
}
type Watcher interface {
// Next is a blocking call
Next() (Event, error)
// Stop stops the watcher
Stop()
}
type WatchOption func(*WatchOptions) error
type WatchOptions struct{}
func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) {
options := WatchOptions{}
var err error
for _, o := range opts {
if err = o(&options); err != nil {
break
}
}
return options, err
}
type Watcher interface {

View File

@@ -67,16 +67,18 @@ func (w *NamespaceStore) String() string {
return w.s.String()
}
// type NamespaceWrapper struct{}
// func NewNamespaceWrapper() Wrapper {
// return &NamespaceWrapper{}
// }
/*
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
fn(ctx, level, msg, getArgs(args)...)
}
func (w *NamespaceStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
return w.s.Watch(ctx, opts...)
}
func (w *NamespaceStore) Live() bool {
return w.s.Live()
}
func (w *NamespaceStore) Ready() bool {
return w.s.Ready()
}
func (w *NamespaceStore) Health() bool {
return w.s.Health()
}
*/