From ae97023092894aee1378401e0dee73b01d3e59cb Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 1 Dec 2024 19:54:38 +0300 Subject: [PATCH] store: updates for Watcher Signed-off-by: Vasiliy Tolstov --- store/noop.go | 42 ++++++++++++++++++++++++++++++++++++++++++ store/store.go | 41 +++++++++++++++++++++++++++++++++++++++++ store/wrapper.go | 14 ++------------ 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/store/noop.go b/store/noop.go index 7b113dfa..83524901 100644 --- a/store/noop.go +++ b/store/noop.go @@ -2,14 +2,18 @@ package store import ( "context" + "sync" "sync/atomic" "go.unistack.org/micro/v3/options" + "go.unistack.org/micro/v3/util/id" ) var _ Store = (*noopStore)(nil) type noopStore struct { + mu sync.Mutex + watchers map[string]Watcher funcRead FuncRead funcWrite FuncWrite funcExists FuncExists @@ -182,3 +186,41 @@ func (n *noopStore) connect(ctx context.Context) error { return nil } + +type watcher struct { + exit chan bool + id string + ch chan Event + opts WatchOptions +} + +func (m *noopStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + id, err := id.New() + if err != nil { + return nil, err + } + wo, err := NewWatchOptions(opts...) + if err != nil { + return nil, err + } + // construct the watcher + w := &watcher{ + exit: make(chan bool), + ch: make(chan Event), + id: id, + opts: wo, + } + + m.mu.Lock() + m.watchers[w.id] = w + m.mu.Unlock() + + return w, nil +} + +func (w *watcher) Next() (Event, error) { + return nil, nil +} + +func (w *watcher) Stop() { +} diff --git a/store/store.go b/store/store.go index 25530cc4..8235be5d 100644 --- a/store/store.go +++ b/store/store.go @@ -4,9 +4,11 @@ package store import ( "context" "errors" + "time" ) var ( + ErrWatcherStopped = errors.New("watcher stopped") // ErrNotConnected is returned when a store is not connected ErrNotConnected = errors.New("not conected") // ErrNotFound is returned when a key doesn't exist @@ -43,6 +45,7 @@ type Store interface { Disconnect(ctx context.Context) error // String returns the name of the implementation. String() string + Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) } type ( @@ -57,3 +60,41 @@ type ( FuncList func(ctx context.Context, opts ...ListOption) ([]string, error) HookList func(next FuncList) FuncList ) + +type EventType int + +const ( + EventTypeUnknown = iota + EventTypeConnect + EventTypeDisconnect + EventTypeOpError +) + +type Event interface { + Timestamp() time.Time + Error() error + Type() EventType +} + +type Watcher interface { + // Next is a blocking call + Next() (Event, error) + // Stop stops the watcher + Stop() +} + +type WatchOption func(*WatchOptions) error + +type WatchOptions struct{} + +func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) { + options := WatchOptions{} + var err error + for _, o := range opts { + if err = o(&options); err != nil { + break + } + } + + return options, err +} diff --git a/store/wrapper.go b/store/wrapper.go index 84c43026..a9221bae 100644 --- a/store/wrapper.go +++ b/store/wrapper.go @@ -67,16 +67,6 @@ func (w *NamespaceStore) String() string { return w.s.String() } -// type NamespaceWrapper struct{} - -// func NewNamespaceWrapper() Wrapper { -// return &NamespaceWrapper{} -// } - -/* -func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc { - return func(ctx context.Context, level Level, msg string, args ...interface{}) { - fn(ctx, level, msg, getArgs(args)...) - } +func (w *NamespaceStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + return w.s.Watch(ctx, opts...) } -*/