From b0320753226066cf0482da88a09242c70052be42 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 1 Dec 2024 19:48:47 +0300 Subject: [PATCH] add store.Watcher interface Signed-off-by: Vasiliy Tolstov --- event.go | 42 +++++++++++++------ redis.go | 121 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 139 insertions(+), 24 deletions(-) diff --git a/event.go b/event.go index cdfe75e..20251a4 100644 --- a/event.go +++ b/event.go @@ -4,19 +4,20 @@ import ( "context" "errors" "net" - "sync/atomic" + "time" goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/store" ) type eventHook struct { - connected *atomic.Bool + s *Store } var _ goredis.Hook = (*eventHook)(nil) -func newEventHook(connected *atomic.Bool) *eventHook { - return &eventHook{connected: connected} +func newEventHook(s *Store) *eventHook { + return &eventHook{s: s} } func (h *eventHook) DialHook(hook goredis.DialHook) goredis.DialHook { @@ -24,11 +25,16 @@ func (h *eventHook) DialHook(hook goredis.DialHook) goredis.DialHook { conn, err := hook(ctx, network, addr) if err != nil { if !isRedisError(err) { - h.connected.Store(false) + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) } - h.connected.Store(true) } else { - h.connected.Store(true) + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } } return conn, err } @@ -39,11 +45,16 @@ func (h *eventHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { err := hook(ctx, cmd) if err != nil { if !isRedisError(err) { - h.connected.Store(false) + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) } - h.connected.Store(true) } else { - h.connected.Store(true) + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } } return err } @@ -54,11 +65,16 @@ func (h *eventHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredi err := hook(ctx, cmds) if err != nil { if !isRedisError(err) { - h.connected.Store(false) + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) } - h.connected.Store(true) } else { - h.connected.Store(true) + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } } return err } diff --git a/redis.go b/redis.go index dec322d..added68 100755 --- a/redis.go +++ b/redis.go @@ -3,21 +3,25 @@ package redis import ( "context" "errors" + "fmt" "reflect" "strings" + "sync" "sync/atomic" "time" goredis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/store" + "go.unistack.org/micro/v3/util/id" pool "go.unistack.org/micro/v3/util/xpool" ) var ( - DefaultPathSeparator = "/" - - DefaultUniversalOptions = &goredis.UniversalOptions{ + _ store.Store = (*Store)(nil) + _ store.Event = (*event)(nil) + sendEventTime = 10 * time.Millisecond + DefaultUniversalOptions = &goredis.UniversalOptions{ Username: "", Password: "", // no password set MaxRetries: 2, @@ -58,12 +62,14 @@ var ( type Store struct { cli goredis.UniversalClient pool *pool.StringsPool - connected *atomic.Bool + connected *atomic.Uint32 opts store.Options + watchers map[string]*watcher + mu sync.RWMutex } func (r *Store) Connect(ctx context.Context) error { - if r.connected.Load() { + if r.connected.Load() == 1 { return nil } if r.cli == nil { @@ -76,7 +82,7 @@ func (r *Store) Connect(ctx context.Context) error { setSpanError(ctx, err) return err } - r.connected.Store(true) + r.connected.Store(1) return nil } @@ -112,7 +118,7 @@ func (r *Store) ClusterClient() *goredis.ClusterClient { } func (r *Store) Disconnect(ctx context.Context) error { - if !r.connected.Load() { + if r.connected.Load() == 0 { return nil } @@ -122,7 +128,7 @@ func (r *Store) Disconnect(ctx context.Context) error { } } - r.connected.Store(false) + r.connected.Store(1) return nil } @@ -650,10 +656,11 @@ func (r *Store) String() string { } func NewStore(opts ...store.Option) *Store { - b := atomic.Bool{} + b := atomic.Uint32{} return &Store{ opts: store.NewOptions(opts...), connected: &b, + watchers: make(map[string]*watcher), } } @@ -745,7 +752,7 @@ func (r *Store) configure() error { r.cli = goredis.NewUniversalClient(universalOptions) setTracing(r.cli, r.opts.Tracer) - r.cli.AddHook(newEventHook(r.connected)) + r.cli.AddHook(newEventHook(r)) r.pool = pool.NewStringsPool(50) @@ -760,8 +767,100 @@ func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace str } if opNamespace != "" { b.WriteString(opNamespace) - b.WriteString(DefaultPathSeparator) + b.WriteString(r.opts.Separator) } b.WriteString(key) return b.String() } + +func (r *Store) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) { + id, err := id.New() + if err != nil { + return nil, err + } + wo, err := store.NewWatchOptions(opts...) + if err != nil { + return nil, err + } + // construct the watcher + w := &watcher{ + exit: make(chan bool), + ch: make(chan store.Event), + id: id, + opts: wo, + } + + r.mu.Lock() + r.watchers[w.id] = w + r.mu.Unlock() + + return w, nil +} + +func (r *Store) sendEvent(e store.Event) { + r.mu.RLock() + watchers := make([]*watcher, 0, len(r.watchers)) + for _, w := range r.watchers { + watchers = append(watchers, w) + } + r.mu.RUnlock() + fmt.Printf("evt %#+v\n", e) + for _, w := range watchers { + select { + case <-w.exit: + r.mu.Lock() + delete(r.watchers, w.id) + r.mu.Unlock() + default: + select { + case w.ch <- e: + case <-time.After(sendEventTime): + } + } + } +} + +type watcher struct { + ch chan store.Event + exit chan bool + opts store.WatchOptions + id string +} + +func (w *watcher) Next() (store.Event, error) { + for { + select { + case e := <-w.ch: + return e, nil + case <-w.exit: + return nil, store.ErrWatcherStopped + } + } +} + +func (w *watcher) Stop() { + select { + case <-w.exit: + return + default: + close(w.exit) + } +} + +type event struct { + ts time.Time + t store.EventType + err error +} + +func (e *event) Error() error { + return e.err +} + +func (e *event) Timestamp() time.Time { + return e.ts +} + +func (e *event) Type() store.EventType { + return e.t +}