store: updates for Watcher
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
115ca6a018
commit
ae97023092
@ -2,14 +2,18 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/options"
|
"go.unistack.org/micro/v3/options"
|
||||||
|
"go.unistack.org/micro/v3/util/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ Store = (*noopStore)(nil)
|
var _ Store = (*noopStore)(nil)
|
||||||
|
|
||||||
type noopStore struct {
|
type noopStore struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
watchers map[string]Watcher
|
||||||
funcRead FuncRead
|
funcRead FuncRead
|
||||||
funcWrite FuncWrite
|
funcWrite FuncWrite
|
||||||
funcExists FuncExists
|
funcExists FuncExists
|
||||||
@ -182,3 +186,41 @@ func (n *noopStore) connect(ctx context.Context) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type watcher struct {
|
||||||
|
exit chan bool
|
||||||
|
id string
|
||||||
|
ch chan Event
|
||||||
|
opts WatchOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *noopStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
|
||||||
|
id, err := id.New()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
wo, err := NewWatchOptions(opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// construct the watcher
|
||||||
|
w := &watcher{
|
||||||
|
exit: make(chan bool),
|
||||||
|
ch: make(chan Event),
|
||||||
|
id: id,
|
||||||
|
opts: wo,
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
m.watchers[w.id] = w
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
return w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) Next() (Event, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *watcher) Stop() {
|
||||||
|
}
|
||||||
|
@ -4,9 +4,11 @@ package store
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
ErrWatcherStopped = errors.New("watcher stopped")
|
||||||
// ErrNotConnected is returned when a store is not connected
|
// ErrNotConnected is returned when a store is not connected
|
||||||
ErrNotConnected = errors.New("not conected")
|
ErrNotConnected = errors.New("not conected")
|
||||||
// ErrNotFound is returned when a key doesn't exist
|
// ErrNotFound is returned when a key doesn't exist
|
||||||
@ -43,6 +45,7 @@ type Store interface {
|
|||||||
Disconnect(ctx context.Context) error
|
Disconnect(ctx context.Context) error
|
||||||
// String returns the name of the implementation.
|
// String returns the name of the implementation.
|
||||||
String() string
|
String() string
|
||||||
|
Watch(ctx context.Context, opts ...WatchOption) (Watcher, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
@ -57,3 +60,41 @@ type (
|
|||||||
FuncList func(ctx context.Context, opts ...ListOption) ([]string, error)
|
FuncList func(ctx context.Context, opts ...ListOption) ([]string, error)
|
||||||
HookList func(next FuncList) FuncList
|
HookList func(next FuncList) FuncList
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type EventType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
EventTypeUnknown = iota
|
||||||
|
EventTypeConnect
|
||||||
|
EventTypeDisconnect
|
||||||
|
EventTypeOpError
|
||||||
|
)
|
||||||
|
|
||||||
|
type Event interface {
|
||||||
|
Timestamp() time.Time
|
||||||
|
Error() error
|
||||||
|
Type() EventType
|
||||||
|
}
|
||||||
|
|
||||||
|
type Watcher interface {
|
||||||
|
// Next is a blocking call
|
||||||
|
Next() (Event, error)
|
||||||
|
// Stop stops the watcher
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
type WatchOption func(*WatchOptions) error
|
||||||
|
|
||||||
|
type WatchOptions struct{}
|
||||||
|
|
||||||
|
func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) {
|
||||||
|
options := WatchOptions{}
|
||||||
|
var err error
|
||||||
|
for _, o := range opts {
|
||||||
|
if err = o(&options); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return options, err
|
||||||
|
}
|
||||||
|
@ -67,16 +67,6 @@ func (w *NamespaceStore) String() string {
|
|||||||
return w.s.String()
|
return w.s.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// type NamespaceWrapper struct{}
|
func (w *NamespaceStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
|
||||||
|
return w.s.Watch(ctx, opts...)
|
||||||
// func NewNamespaceWrapper() Wrapper {
|
|
||||||
// return &NamespaceWrapper{}
|
|
||||||
// }
|
|
||||||
|
|
||||||
/*
|
|
||||||
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
|
|
||||||
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
|
|
||||||
fn(ctx, level, msg, getArgs(args)...)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
Loading…
Reference in New Issue
Block a user