From edb352fd48190fddef8f6f95c142a0cc09d72d40 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 26 Oct 2022 13:46:48 +0300 Subject: [PATCH] add MGet/Mset methods Signed-off-by: Vasiliy Tolstov --- redis.go | 140 ++++++++++++++++++++++++++++++++++++++++---------- redis_test.go | 131 ++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 233 insertions(+), 38 deletions(-) mode change 100644 => 100755 redis.go mode change 100644 => 100755 redis_test.go diff --git a/redis.go b/redis.go old mode 100644 new mode 100755 index 6051e42..01cc2e6 --- a/redis.go +++ b/redis.go @@ -3,6 +3,8 @@ package redis // import "go.unistack.org/micro-store-redis/v3" import ( "context" "fmt" + "reflect" + "strings" "time" "github.com/go-redis/redis/v8" @@ -14,18 +16,20 @@ type rkv struct { cli redisClient } -// TODO: add ability to set some redis options https://pkg.go.dev/github.com/go-redis/redis/v8#Options type redisClient interface { Get(ctx context.Context, key string) *redis.StringCmd Del(ctx context.Context, keys ...string) *redis.IntCmd Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd Keys(ctx context.Context, pattern string) *redis.StringSliceCmd + MGet(ctx context.Context, keys ...string) *redis.SliceCmd + MSet(ctx context.Context, kv ...interface{}) *redis.StatusCmd Exists(ctx context.Context, keys ...string) *redis.IntCmd + Ping(ctx context.Context) *redis.StatusCmd Close() error } func (r *rkv) Connect(ctx context.Context) error { - return nil + return r.cli.Ping(ctx).Err() } func (r *rkv) Init(opts ...store.Option) error { @@ -45,17 +49,19 @@ func (r *rkv) Exists(ctx context.Context, key string, opts ...store.ExistsOption if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } + if options.Namespace != "" { + key = fmt.Sprintf("%s%s", r.opts.Namespace, key) + } if r.opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) defer cancel() } - rkey := fmt.Sprintf("%s%s", options.Namespace, key) - st, err := r.cli.Exists(ctx, rkey).Result() + val, err := r.cli.Exists(ctx, key).Result() if err != nil { return err } - if st == 0 { + if val == 0 { return store.ErrNotFound } return nil @@ -66,13 +72,15 @@ func (r *rkv) Read(ctx context.Context, key string, val interface{}, opts ...sto if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } + if options.Namespace != "" { + key = fmt.Sprintf("%s%s", options.Namespace, key) + } if r.opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) defer cancel() } - rkey := fmt.Sprintf("%s%s", options.Namespace, key) - buf, err := r.cli.Get(ctx, rkey).Bytes() + buf, err := r.cli.Get(ctx, key).Bytes() if err != nil && err == redis.Nil { return store.ErrNotFound } else if err != nil { @@ -81,34 +89,97 @@ func (r *rkv) Read(ctx context.Context, key string, val interface{}, opts ...sto if buf == nil { return store.ErrNotFound } - /* - d, err := r.Client.TTL(rkey).Result() - if err != nil { - return nil, err - } - - records = append(records, &store.Record{ - Key: key, - Value: val, - Expiry: d, - }) - } - */ return r.opts.Codec.Unmarshal(buf, val) } +func (r *rkv) MRead(ctx context.Context, keys []string, vals interface{}, opts ...store.ReadOption) error { + if len(keys) == 1 { + vt := reflect.ValueOf(vals) + if vt.Kind() == reflect.Ptr { + vt = reflect.Indirect(vt) + return r.Read(ctx, keys[0], vt.Index(0).Interface(), opts...) + } + } + options := store.NewReadOptions(opts...) + rkeys := make([]string, 0, len(keys)) + if len(options.Namespace) == 0 { + options.Namespace = r.opts.Namespace + } + for _, key := range keys { + if options.Namespace != "" { + rkeys = append(rkeys, fmt.Sprintf("%s%s", options.Namespace, key)) + } else { + rkeys = append(rkeys, key) + } + } + if r.opts.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) + defer cancel() + } + rvals, err := r.cli.MGet(ctx, rkeys...).Result() + if err != nil && err == redis.Nil { + return store.ErrNotFound + } else if err != nil { + return err + } + if len(rvals) == 0 { + return store.ErrNotFound + } + + vv := reflect.ValueOf(vals) + vt := reflect.TypeOf(vals) + switch vv.Kind() { + case reflect.Ptr: + vv = reflect.Indirect(vv) + vt = vt.Elem() + } + if vv.Kind() != reflect.Slice { + return store.ErrNotFound + } + nvv := reflect.MakeSlice(vt, len(rvals), len(rvals)) + vt = vt.Elem() + for idx := 0; idx < len(rvals); idx++ { + if rvals[idx] == nil { + continue + } + + itm := nvv.Index(idx) + var buf []byte + switch b := rvals[idx].(type) { + case []byte: + buf = b + case string: + buf = []byte(b) + } + // special case for raw data + if vt.Kind() == reflect.Slice && vt.Elem().Kind() == reflect.Uint8 { + itm.Set(reflect.MakeSlice(itm.Type(), len(buf), len(buf))) + } else { + itm.Set(reflect.New(vt.Elem())) + } + if err = r.opts.Codec.Unmarshal(buf, itm.Interface()); err != nil { + return err + } + } + vv.Set(nvv) + return nil +} + func (r *rkv) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error { options := store.NewDeleteOptions(opts...) if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } + if options.Namespace == "" { + return r.cli.Del(ctx, key).Err() + } if r.opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) defer cancel() } - rkey := fmt.Sprintf("%s%s", options.Namespace, key) - return r.cli.Del(ctx, rkey).Err() + return r.cli.Del(ctx, fmt.Sprintf("%s%s", options.Namespace, key)).Err() } func (r *rkv) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { @@ -116,18 +187,19 @@ func (r *rkv) Write(ctx context.Context, key string, val interface{}, opts ...st if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } - - rkey := fmt.Sprintf("%s%s", options.Namespace, key) buf, err := r.opts.Codec.Marshal(val) if err != nil { return err } + if options.Namespace != "" { + key = fmt.Sprintf("%s%s", options.Namespace, key) + } if r.opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) defer cancel() } - return r.cli.Set(ctx, rkey, buf, options.TTL).Err() + return r.cli.Set(ctx, key, buf, options.TTL).Err() } func (r *rkv) List(ctx context.Context, opts ...store.ListOption) ([]string, error) { @@ -135,18 +207,30 @@ func (r *rkv) List(ctx context.Context, opts ...store.ListOption) ([]string, err if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } + + rkey := fmt.Sprintf("%s%s*", options.Namespace, options.Prefix) + if options.Suffix != "" { + rkey += options.Suffix + } if r.opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) defer cancel() } // TODO: add support for prefix/suffix/limit - keys, err := r.cli.Keys(ctx, "*").Result() + keys, err := r.cli.Keys(ctx, rkey).Result() if err != nil { return nil, err } + if options.Namespace == "" { + return keys, nil + } - return keys, nil + nkeys := make([]string, 0, len(keys)) + for _, key := range keys { + nkeys = append(nkeys, strings.TrimPrefix(key, options.Namespace)) + } + return nkeys, nil } func (r *rkv) Options() store.Options { @@ -161,7 +245,7 @@ func (r *rkv) String() string { return "redis" } -func NewStore(opts ...store.Option) store.Store { +func NewStore(opts ...store.Option) *rkv { return &rkv{opts: store.NewOptions(opts...)} } diff --git a/redis_test.go b/redis_test.go old mode 100644 new mode 100755 index fb1d097..b4bdc17 --- a/redis_test.go +++ b/redis_test.go @@ -1,6 +1,7 @@ package redis import ( + "bytes" "context" "os" "testing" @@ -89,33 +90,143 @@ func Test_Store(t *testing.T) { if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { t.Skip() } - r := new(rkv) + r := NewStore(store.Addrs(os.Getenv("STORE_NODES"))) - // r.options = store.Options{Nodes: []string{"redis://:password@127.0.0.1:6379"}} - // r.options = store.Options{Nodes: []string{"127.0.0.1:6379"}} - - r.opts = store.NewOptions(store.Addrs(os.Getenv("STORE_NODES"))) - - if err := r.configure(); err != nil { + if err := r.Init(); err != nil { + t.Fatal(err) + } + if err := r.Connect(ctx); err != nil { t.Fatal(err) } key := "myTest" + tval := []byte("myValue") var val []byte - err := r.Write(ctx, key, []byte("myValue"), store.WriteTTL(2*time.Minute)) + err := r.Write(ctx, key, tval, store.WriteTTL(2*time.Minute)) if err != nil { t.Fatalf("Write error: %v", err) } - err = r.Read(ctx, key, val) + err = r.Read(ctx, key, &val) if err != nil { t.Fatalf("Read error: %v\n", err) + } else if !bytes.Equal(val, tval) { + t.Fatalf("read err: data not eq") } + keys, err := r.List(ctx) + if err != nil { + t.Fatalf("List error: %v\n", err) + } + _ = keys err = r.Delete(ctx, key) if err != nil { t.Fatalf("Delete error: %v\n", err) } - _, err = r.List(ctx) + // t.Logf("%v", keys) +} + +func Test_MRead(t *testing.T) { + ctx := context.Background() + var err error + if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { + t.Skip() + } + r := NewStore(store.Addrs(os.Getenv("STORE_NODES"))) + + if err = r.Init(); err != nil { + t.Fatal(err) + } + if err = r.Connect(ctx); err != nil { + t.Fatal(err) + } + + key1 := "myTest1" + key2 := "myTest2" + tval1 := []byte("myValue1") + tval2 := []byte("myValue2") + var vals [][]byte + err = r.Write(ctx, key1, tval1, store.WriteTTL(2*time.Minute)) + if err != nil { + t.Fatalf("Write error: %v", err) + } + err = r.Write(ctx, key2, tval2, store.WriteTTL(2*time.Minute)) + if err != nil { + t.Fatalf("Write error: %v", err) + } + err = r.MRead(ctx, []string{key1, key2}, &vals) + if err != nil { + t.Fatalf("Read error: %v\n", err) + } + // t.Logf("%s", vals) + _ = vals + keys, err := r.List(ctx) if err != nil { t.Fatalf("List error: %v\n", err) } + _ = keys + // t.Logf("%v", keys) + err = r.Delete(ctx, key1) + if err != nil { + t.Fatalf("Delete error: %v\n", err) + } + err = r.Delete(ctx, key2) + if err != nil { + t.Fatalf("Delete error: %v\n", err) + } } + +/* +func Test_MReadCodec(t *testing.T) { + type mytype struct { + Key string `json:"name"` + Val string `json:"val"` + } + ctx := context.Background() + var err error + if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { + t.Skip() + } + r := NewStore(store.Nodes(os.Getenv("STORE_NODES")), store.Codec(jsoncodec.NewCodec())) + + if err = r.Init(); err != nil { + t.Fatal(err) + } + if err = r.Connect(ctx); err != nil { + t.Fatal(err) + } + + key1 := "myTest1" + key2 := "myTest2" + key3 := "myTest3" + tval1 := &mytype{Key: "key1", Val: "val1"} + tval2 := &mytype{Key: "key2", Val: "val2"} + var vals []*mytype + err = r.Write(ctx, key1, tval1, store.WriteTTL(2*time.Minute)) + if err != nil { + t.Fatalf("Write error: %v", err) + } + err = r.Write(ctx, key2, tval2, store.WriteTTL(2*time.Minute)) + if err != nil { + t.Fatalf("Write error: %v", err) + } + err = r.MRead(ctx, []string{key1, key3, key2}, &vals) + if err != nil { + t.Fatalf("Read error: %v\n", err) + } + if vals[0].Key != "key1" || vals[1] != nil || vals[2].Key != "key2" { + t.Fatalf("read err: struct not filled") + } + keys, err := r.List(ctx) + if err != nil { + t.Fatalf("List error: %v\n", err) + } + _ = keys + err = r.Delete(ctx, key1) + if err != nil { + t.Fatalf("Delete error: %v\n", err) + } + err = r.Delete(ctx, key2) + if err != nil { + t.Fatalf("Delete error: %v\n", err) + } +} +*/