diff --git a/event.go b/event.go new file mode 100644 index 0000000..6f1de6d --- /dev/null +++ b/event.go @@ -0,0 +1,55 @@ +package redis + +import ( + "context" + "errors" + "net" + "sync/atomic" + + goredis "github.com/redis/go-redis/v9" +) + +type eventHook struct { + connected *atomic.Bool +} + +var _ goredis.Hook = (*eventHook)(nil) + +func newEventHook(connected *atomic.Bool) *eventHook { + return &eventHook{connected: connected} +} + +func (h *eventHook) DialHook(hook goredis.DialHook) goredis.DialHook { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + conn, err := hook(ctx, network, addr) + if err != nil && !isRedisError(err) { + h.connected.Store(false) + } + return conn, err + } +} + +func (h *eventHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { + return func(ctx context.Context, cmd goredis.Cmder) error { + err := hook(ctx, cmd) + if err != nil && !isRedisError(err) { + h.connected.Store(false) + } + return err + } +} + +func (h *eventHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook { + return func(ctx context.Context, cmds []goredis.Cmder) error { + err := hook(ctx, cmds) + if err != nil && !isRedisError(err) { + h.connected.Store(false) + } + return err + } +} + +func isRedisError(err error) bool { + var rerr goredis.Error + return errors.As(err, &rerr) +} diff --git a/go.mod b/go.mod index e00ab6c..672c006 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.4 require ( github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 github.com/redis/go-redis/v9 v9.7.0 - go.unistack.org/micro/v3 v3.10.105 + go.unistack.org/micro/v3 v3.10.106 ) require ( diff --git a/go.sum b/go.sum index 706c028..d3b7480 100644 --- a/go.sum +++ b/go.sum @@ -14,7 +14,7 @@ github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= -go.unistack.org/micro/v3 v3.10.105 h1:JYNV0d+fnR7Hy8d4/sjr+25DbSNqq1Z7IPeDDdB+f1I= -go.unistack.org/micro/v3 v3.10.105/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= +go.unistack.org/micro/v3 v3.10.106 h1:ya4+n58l4PImtrIKrJi1GgkUuJ1gmzLYa9WKYI1JFLs= +go.unistack.org/micro/v3 v3.10.106/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/redis.go b/redis.go index 20b1b07..1ca10e0 100755 --- a/redis.go +++ b/redis.go @@ -55,11 +55,10 @@ var ( ) type Store struct { - opts store.Options - cli goredis.UniversalClient - done chan struct{} - pool *pool.StringsPool - isConnected atomic.Int32 + opts store.Options + cli goredis.UniversalClient + pool *pool.StringsPool + connected *atomic.Bool } func (r *Store) Connect(ctx context.Context) error { @@ -69,8 +68,12 @@ func (r *Store) Connect(ctx context.Context) error { if r.opts.LazyConnect { return nil } - return r.connect(ctx) - + if err := r.cli.Ping(ctx).Err(); err != nil { + setSpanError(ctx, err) + return err + } + r.connected.Store(true) + return nil } func (r *Store) Init(opts ...store.Option) error { @@ -105,26 +108,21 @@ func (r *Store) ClusterClient() *goredis.ClusterClient { } func (r *Store) Disconnect(ctx context.Context) error { - var err error - select { - case <-r.done: - return err - default: - if r.cli != nil { - if err = r.cli.Close(); err != nil { - r.isConnected.Store(0) - } - } - close(r.done) - return err + if !r.connected.Load() { + return nil } + + if r.cli != nil { + if err := r.cli.Close(); err != nil { + return err + } + } + + r.connected.Store(false) + return nil } func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error { - if err := r.connect(ctx); err != nil { - return err - } - b := r.pool.Get() defer r.pool.Put(b) options := store.NewExistsOptions(opts...) @@ -155,7 +153,7 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti return store.ErrNotFound } else if err == nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() - } else if err != nil { + } else { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -164,10 +162,6 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti } func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { - if err := r.connect(ctx); err != nil { - return err - } - b := r.pool.Get() defer r.pool.Put(b) @@ -219,10 +213,6 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s } func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts ...store.ReadOption) error { - if err := r.connect(ctx); err != nil { - return err - } - options := store.NewReadOptions(opts...) timeout := r.opts.Timeout @@ -322,10 +312,6 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts } func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.DeleteOption) error { - if err := r.connect(ctx); err != nil { - return err - } - options := store.NewDeleteOptions(opts...) timeout := r.opts.Timeout @@ -381,10 +367,6 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete } func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error { - if err := r.connect(ctx); err != nil { - return err - } - b := r.pool.Get() defer r.pool.Put(b) @@ -423,10 +405,6 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti } func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, opts ...store.WriteOption) error { - if err := r.connect(ctx); err != nil { - return err - } - options := store.NewWriteOptions(opts...) timeout := r.opts.Timeout @@ -510,10 +488,6 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { - if err := r.connect(ctx); err != nil { - return err - } - b := r.pool.Get() defer r.pool.Put(b) @@ -560,7 +534,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... return store.ErrNotFound } else if err == nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() - } else if err != nil { + } else { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -569,10 +543,6 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... } func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) { - if err := r.connect(ctx); err != nil { - return nil, err - } - b := r.pool.Get() defer r.pool.Put(b) @@ -659,7 +629,11 @@ func (r *Store) String() string { } func NewStore(opts ...store.Option) *Store { - return &Store{done: make(chan struct{}), opts: store.NewOptions(opts...)} + b := atomic.Bool{} + return &Store{ + opts: store.NewOptions(opts...), + connected: &b, + } } func (r *Store) configure() error { @@ -750,6 +724,7 @@ func (r *Store) configure() error { r.cli = goredis.NewUniversalClient(universalOptions) setTracing(r.cli, r.opts.Tracer) + r.cli.AddHook(newEventHook(r.connected)) r.pool = pool.NewStringsPool(50) @@ -769,14 +744,3 @@ func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace str b.WriteString(key) return b.String() } - -func (r *Store) connect(ctx context.Context) error { - if r.isConnected.Load() == 0 { - if err := r.cli.Ping(ctx).Err(); err != nil { - setSpanError(ctx, err) - return err - } - } - r.isConnected.Store(1) - return nil -} diff --git a/redis_test.go b/redis_test.go index 4fa7a20..b5a3ae2 100755 --- a/redis_test.go +++ b/redis_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "os" + "sync/atomic" "testing" "time" @@ -20,7 +21,7 @@ func TestKeepTTL(t *testing.T) { } r := NewStore(store.Addrs(os.Getenv("STORE_NODES"))) - if err := r.Init(); err != nil { + if err := r.Init(store.LazyConnect(true)); err != nil { t.Fatal(err) } if err := r.Connect(ctx); err != nil { @@ -99,9 +100,11 @@ func Test_rkv_configure(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + b := atomic.Bool{} rc := &Store{ - opts: tt.fields.options, - cli: tt.fields.Client, + opts: tt.fields.options, + cli: tt.fields.Client, + connected: &b, } err := rc.configure() if (err != nil) != tt.wantErr { diff --git a/tracer.go b/tracer.go index 9df06fe..4936600 100644 --- a/tracer.go +++ b/tracer.go @@ -73,7 +73,7 @@ func (h *tracingHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook case "cluster slots": break default: - _, span := h.tr.Start(ctx, "goredis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) + _, span := h.tr.Start(ctx, "sdk.database", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) defer func() { recordError(span, err) span.Finish() @@ -95,7 +95,7 @@ func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) gore "db.statement", cmdsString, )) - _, span := h.tr.Start(ctx, "goredis.process_pipeline", opts...) + _, span := h.tr.Start(ctx, "sdk.database", opts...) defer span.Finish() err := hook(ctx, cmds)