store: updates for Watcher
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -2,14 +2,18 @@ package store | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"sync" | ||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v3/options" | 	"go.unistack.org/micro/v3/options" | ||||||
|  | 	"go.unistack.org/micro/v3/util/id" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var _ Store = (*noopStore)(nil) | var _ Store = (*noopStore)(nil) | ||||||
|  |  | ||||||
| type noopStore struct { | type noopStore struct { | ||||||
|  | 	mu          sync.Mutex | ||||||
|  | 	watchers    map[string]Watcher | ||||||
| 	funcRead    FuncRead | 	funcRead    FuncRead | ||||||
| 	funcWrite   FuncWrite | 	funcWrite   FuncWrite | ||||||
| 	funcExists  FuncExists | 	funcExists  FuncExists | ||||||
| @@ -182,3 +186,41 @@ func (n *noopStore) connect(ctx context.Context) error { | |||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type watcher struct { | ||||||
|  | 	exit chan bool | ||||||
|  | 	id   string | ||||||
|  | 	ch   chan Event | ||||||
|  | 	opts WatchOptions | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *noopStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { | ||||||
|  | 	id, err := id.New() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	wo, err := NewWatchOptions(opts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	// construct the watcher | ||||||
|  | 	w := &watcher{ | ||||||
|  | 		exit: make(chan bool), | ||||||
|  | 		ch:   make(chan Event), | ||||||
|  | 		id:   id, | ||||||
|  | 		opts: wo, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	m.mu.Lock() | ||||||
|  | 	m.watchers[w.id] = w | ||||||
|  | 	m.mu.Unlock() | ||||||
|  |  | ||||||
|  | 	return w, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *watcher) Next() (Event, error) { | ||||||
|  | 	return nil, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (w *watcher) Stop() { | ||||||
|  | } | ||||||
|   | |||||||
| @@ -4,9 +4,11 @@ package store | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"time" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ( | var ( | ||||||
|  | 	ErrWatcherStopped = errors.New("watcher stopped") | ||||||
| 	// ErrNotConnected is returned when a store is not connected | 	// ErrNotConnected is returned when a store is not connected | ||||||
| 	ErrNotConnected = errors.New("not conected") | 	ErrNotConnected = errors.New("not conected") | ||||||
| 	// ErrNotFound is returned when a key doesn't exist | 	// ErrNotFound is returned when a key doesn't exist | ||||||
| @@ -43,6 +45,7 @@ type Store interface { | |||||||
| 	Disconnect(ctx context.Context) error | 	Disconnect(ctx context.Context) error | ||||||
| 	// String returns the name of the implementation. | 	// String returns the name of the implementation. | ||||||
| 	String() string | 	String() string | ||||||
|  | 	Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) | ||||||
| } | } | ||||||
|  |  | ||||||
| type ( | type ( | ||||||
| @@ -57,3 +60,41 @@ type ( | |||||||
| 	FuncList   func(ctx context.Context, opts ...ListOption) ([]string, error) | 	FuncList   func(ctx context.Context, opts ...ListOption) ([]string, error) | ||||||
| 	HookList   func(next FuncList) FuncList | 	HookList   func(next FuncList) FuncList | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | type EventType int | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	EventTypeUnknown = iota | ||||||
|  | 	EventTypeConnect | ||||||
|  | 	EventTypeDisconnect | ||||||
|  | 	EventTypeOpError | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type Event interface { | ||||||
|  | 	Timestamp() time.Time | ||||||
|  | 	Error() error | ||||||
|  | 	Type() EventType | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Watcher interface { | ||||||
|  | 	// Next is a blocking call | ||||||
|  | 	Next() (Event, error) | ||||||
|  | 	// Stop stops the watcher | ||||||
|  | 	Stop() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type WatchOption func(*WatchOptions) error | ||||||
|  |  | ||||||
|  | type WatchOptions struct{} | ||||||
|  |  | ||||||
|  | func NewWatchOptions(opts ...WatchOption) (WatchOptions, error) { | ||||||
|  | 	options := WatchOptions{} | ||||||
|  | 	var err error | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		if err = o(&options); err != nil { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return options, err | ||||||
|  | } | ||||||
|   | |||||||
| @@ -67,16 +67,6 @@ func (w *NamespaceStore) String() string { | |||||||
| 	return w.s.String() | 	return w.s.String() | ||||||
| } | } | ||||||
|  |  | ||||||
| // type NamespaceWrapper struct{} | func (w *NamespaceStore) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { | ||||||
|  | 	return w.s.Watch(ctx, opts...) | ||||||
| // func NewNamespaceWrapper() Wrapper { |  | ||||||
| //	return &NamespaceWrapper{} |  | ||||||
| // } |  | ||||||
|  |  | ||||||
| /* |  | ||||||
| func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc { |  | ||||||
| 	return func(ctx context.Context, level Level, msg string, args ...interface{}) { |  | ||||||
| 		fn(ctx, level, msg, getArgs(args)...) |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
| */ |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user