diff --git a/event.go b/event.go index cdfe75e..20251a4 100644 --- a/event.go +++ b/event.go @@ -4,19 +4,20 @@ import ( "context" "errors" "net" - "sync/atomic" + "time" goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/store" ) type eventHook struct { - connected *atomic.Bool + s *Store } var _ goredis.Hook = (*eventHook)(nil) -func newEventHook(connected *atomic.Bool) *eventHook { - return &eventHook{connected: connected} +func newEventHook(s *Store) *eventHook { + return &eventHook{s: s} } func (h *eventHook) DialHook(hook goredis.DialHook) goredis.DialHook { @@ -24,11 +25,16 @@ func (h *eventHook) DialHook(hook goredis.DialHook) goredis.DialHook { conn, err := hook(ctx, network, addr) if err != nil { if !isRedisError(err) { - h.connected.Store(false) + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) } - h.connected.Store(true) } else { - h.connected.Store(true) + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } } return conn, err } @@ -39,11 +45,16 @@ func (h *eventHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { err := hook(ctx, cmd) if err != nil { if !isRedisError(err) { - h.connected.Store(false) + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) } - h.connected.Store(true) } else { - h.connected.Store(true) + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } } return err } @@ -54,11 +65,16 @@ func (h *eventHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredi err := hook(ctx, cmds) if err != nil { if !isRedisError(err) { - h.connected.Store(false) + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) } - h.connected.Store(true) } else { - h.connected.Store(true) + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } } return err } diff --git a/go.mod b/go.mod index 672c006..39ceb37 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.4 require ( github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 github.com/redis/go-redis/v9 v9.7.0 - go.unistack.org/micro/v3 v3.10.106 + go.unistack.org/micro/v3 v3.10.108 ) require ( diff --git a/go.sum b/go.sum index d3b7480..61ce849 100644 --- a/go.sum +++ b/go.sum @@ -14,7 +14,7 @@ github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.10.106 h1:ya4+n58l4PImtrIKrJi1GgkUuJ1gmzLYa9WKYI1JFLs= -go.unistack.org/micro/v3 v3.10.106/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= +go.unistack.org/micro/v3 v3.10.108 h1:3L7SkilMVLtH8y3pQIPtr3jjQYrf0AMv1oAkoL3nFkE= +go.unistack.org/micro/v3 v3.10.108/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/redis.go b/redis.go index dec322d..392e9de 100755 --- a/redis.go +++ b/redis.go @@ -5,19 +5,22 @@ import ( "errors" "reflect" "strings" + "sync" "sync/atomic" "time" goredis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/store" + "go.unistack.org/micro/v3/util/id" pool "go.unistack.org/micro/v3/util/xpool" ) var ( - DefaultPathSeparator = "/" - - DefaultUniversalOptions = &goredis.UniversalOptions{ + _ store.Store = (*Store)(nil) + _ store.Event = (*event)(nil) + sendEventTime = 10 * time.Millisecond + DefaultUniversalOptions = &goredis.UniversalOptions{ Username: "", Password: "", // no password set MaxRetries: 2, @@ -58,12 +61,14 @@ var ( type Store struct { cli goredis.UniversalClient pool *pool.StringsPool - connected *atomic.Bool + connected *atomic.Uint32 opts store.Options + watchers map[string]*watcher + mu sync.RWMutex } func (r *Store) Connect(ctx context.Context) error { - if r.connected.Load() { + if r.connected.Load() == 1 { return nil } if r.cli == nil { @@ -76,7 +81,7 @@ func (r *Store) Connect(ctx context.Context) error { setSpanError(ctx, err) return err } - r.connected.Store(true) + r.connected.Store(1) return nil } @@ -112,7 +117,7 @@ func (r *Store) ClusterClient() *goredis.ClusterClient { } func (r *Store) Disconnect(ctx context.Context) error { - if !r.connected.Load() { + if r.connected.Load() == 0 { return nil } @@ -122,7 +127,7 @@ func (r *Store) Disconnect(ctx context.Context) error { } } - r.connected.Store(false) + r.connected.Store(1) return nil } @@ -650,10 +655,11 @@ func (r *Store) String() string { } func NewStore(opts ...store.Option) *Store { - b := atomic.Bool{} + b := atomic.Uint32{} return &Store{ opts: store.NewOptions(opts...), connected: &b, + watchers: make(map[string]*watcher), } } @@ -745,7 +751,7 @@ func (r *Store) configure() error { r.cli = goredis.NewUniversalClient(universalOptions) setTracing(r.cli, r.opts.Tracer) - r.cli.AddHook(newEventHook(r.connected)) + r.cli.AddHook(newEventHook(r)) r.pool = pool.NewStringsPool(50) @@ -760,8 +766,99 @@ func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace str } if opNamespace != "" { b.WriteString(opNamespace) - b.WriteString(DefaultPathSeparator) + b.WriteString(r.opts.Separator) } b.WriteString(key) return b.String() } + +func (r *Store) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) { + id, err := id.New() + if err != nil { + return nil, err + } + wo, err := store.NewWatchOptions(opts...) + if err != nil { + return nil, err + } + // construct the watcher + w := &watcher{ + exit: make(chan bool), + ch: make(chan store.Event), + id: id, + opts: wo, + } + + r.mu.Lock() + r.watchers[w.id] = w + r.mu.Unlock() + + return w, nil +} + +func (r *Store) sendEvent(e store.Event) { + r.mu.RLock() + watchers := make([]*watcher, 0, len(r.watchers)) + for _, w := range r.watchers { + watchers = append(watchers, w) + } + r.mu.RUnlock() + for _, w := range watchers { + select { + case <-w.exit: + r.mu.Lock() + delete(r.watchers, w.id) + r.mu.Unlock() + default: + select { + case w.ch <- e: + case <-time.After(sendEventTime): + } + } + } +} + +type watcher struct { + ch chan store.Event + exit chan bool + opts store.WatchOptions + id string +} + +func (w *watcher) Next() (store.Event, error) { + for { + select { + case e := <-w.ch: + return e, nil + case <-w.exit: + return nil, store.ErrWatcherStopped + } + } +} + +func (w *watcher) Stop() { + select { + case <-w.exit: + return + default: + close(w.exit) + } +} + +type event struct { + ts time.Time + t store.EventType + err error +} + +func (e *event) Error() error { + return e.err +} + +func (e *event) Timestamp() time.Time { + return e.ts +} + +func (e *event) Type() store.EventType { + return e.t +}