Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
de72a10973 | |||
62c2de51d4 | |||
741b2310ec |
165
redis.go
165
redis.go
@@ -10,6 +10,7 @@ import (
|
|||||||
redis "github.com/redis/go-redis/v9"
|
redis "github.com/redis/go-redis/v9"
|
||||||
"go.unistack.org/micro/v3/semconv"
|
"go.unistack.org/micro/v3/semconv"
|
||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v3/store"
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
pool "go.unistack.org/micro/v3/util/xpool"
|
pool "go.unistack.org/micro/v3/util/xpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -44,26 +45,20 @@ var (
|
|||||||
|
|
||||||
type Store struct {
|
type Store struct {
|
||||||
opts store.Options
|
opts store.Options
|
||||||
cli redisClient
|
cli *wrappedClient
|
||||||
pool pool.Pool[strings.Builder]
|
pool pool.Pool[*strings.Builder]
|
||||||
}
|
}
|
||||||
|
|
||||||
type redisClient interface {
|
type wrappedClient struct {
|
||||||
Get(ctx context.Context, key string) *redis.StringCmd
|
*redis.Client
|
||||||
Del(ctx context.Context, keys ...string) *redis.IntCmd
|
*redis.ClusterClient
|
||||||
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
|
|
||||||
Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
|
|
||||||
MGet(ctx context.Context, keys ...string) *redis.SliceCmd
|
|
||||||
MSet(ctx context.Context, kv ...interface{}) *redis.StatusCmd
|
|
||||||
Exists(ctx context.Context, keys ...string) *redis.IntCmd
|
|
||||||
Ping(ctx context.Context) *redis.StatusCmd
|
|
||||||
Pipeline() redis.Pipeliner
|
|
||||||
Pipelined(ctx context.Context, fn func(redis.Pipeliner) error) ([]redis.Cmder, error)
|
|
||||||
Close() error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Store) Connect(ctx context.Context) error {
|
func (r *Store) Connect(ctx context.Context) error {
|
||||||
return r.cli.Ping(ctx).Err()
|
if r.cli.Client != nil {
|
||||||
|
return r.cli.Client.Ping(ctx).Err()
|
||||||
|
}
|
||||||
|
return r.cli.ClusterClient.Ping(ctx).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Store) Init(opts ...store.Option) error {
|
func (r *Store) Init(opts ...store.Option) error {
|
||||||
@@ -74,12 +69,25 @@ func (r *Store) Init(opts ...store.Option) error {
|
|||||||
return r.configure()
|
return r.configure()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Store) Redis() *redis.Client {
|
func (r *Store) Client() *redis.Client {
|
||||||
return r.cli.(*redis.Client)
|
if r.cli.Client != nil {
|
||||||
|
return r.cli.Client
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Store) ClusterClient() *redis.ClusterClient {
|
||||||
|
if r.cli.ClusterClient != nil {
|
||||||
|
return r.cli.ClusterClient
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Store) Disconnect(ctx context.Context) error {
|
func (r *Store) Disconnect(ctx context.Context) error {
|
||||||
return r.cli.Close()
|
if r.cli.Client != nil {
|
||||||
|
return r.cli.Client.Close()
|
||||||
|
}
|
||||||
|
return r.cli.ClusterClient.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
|
||||||
@@ -97,12 +105,18 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
|
|||||||
}
|
}
|
||||||
|
|
||||||
rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
|
rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
|
||||||
ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey)
|
ctx, sp := r.opts.Tracer.Start(ctx, "cache exists "+rkey)
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
val, err := r.cli.Exists(ctx, rkey).Result()
|
var err error
|
||||||
|
var val int64
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
val, err = r.cli.Client.Exists(ctx, rkey).Result()
|
||||||
|
} else {
|
||||||
|
val, err = r.cli.ClusterClient.Exists(ctx, rkey).Result()
|
||||||
|
}
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
||||||
@@ -113,6 +127,7 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -134,9 +149,19 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
|
||||||
|
ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey)
|
||||||
|
defer sp.Finish()
|
||||||
|
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
buf, err := r.cli.Get(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Bytes()
|
var buf []byte
|
||||||
|
var err error
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
buf, err = r.cli.Client.Get(ctx, rkey).Bytes()
|
||||||
|
} else {
|
||||||
|
buf, err = r.cli.ClusterClient.Get(ctx, rkey).Bytes()
|
||||||
|
}
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
||||||
@@ -147,6 +172,7 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -157,7 +183,9 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
|
|||||||
case *string:
|
case *string:
|
||||||
*b = string(buf)
|
*b = string(buf)
|
||||||
default:
|
default:
|
||||||
err = r.opts.Codec.Unmarshal(buf, val)
|
if err = r.opts.Codec.Unmarshal(buf, val); err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@@ -183,9 +211,18 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mread %v", keys))
|
||||||
|
defer sp.Finish()
|
||||||
|
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
rvals, err := r.cli.MGet(ctx, keys...).Result()
|
var rvals []interface{}
|
||||||
|
var err error
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
rvals, err = r.cli.Client.MGet(ctx, keys...).Result()
|
||||||
|
} else {
|
||||||
|
rvals, err = r.cli.ClusterClient.MGet(ctx, keys...).Result()
|
||||||
|
}
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
||||||
@@ -196,6 +233,7 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -237,10 +275,12 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
|
|||||||
|
|
||||||
itm.Set(reflect.New(vt.Elem()))
|
itm.Set(reflect.New(vt.Elem()))
|
||||||
if err = r.opts.Codec.Unmarshal(buf, itm.Interface()); err != nil {
|
if err = r.opts.Codec.Unmarshal(buf, itm.Interface()); err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
vv.Set(nvv)
|
vv.Set(nvv)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -264,9 +304,17 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mdelete %v", keys))
|
||||||
|
defer sp.Finish()
|
||||||
|
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
err := r.cli.Del(ctx, keys...).Err()
|
var err error
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
err = r.cli.Client.Del(ctx, keys...).Err()
|
||||||
|
} else {
|
||||||
|
err = r.cli.ClusterClient.Del(ctx, keys...).Err()
|
||||||
|
}
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
||||||
@@ -277,6 +325,7 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -298,9 +347,17 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache delete %v", key))
|
||||||
|
defer sp.Finish()
|
||||||
|
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
err := r.cli.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
|
var err error
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
err = r.cli.Client.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
|
||||||
|
} else {
|
||||||
|
err = r.cli.ClusterClient.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
|
||||||
|
}
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
||||||
@@ -311,6 +368,7 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -332,6 +390,9 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mwrite %v", keys))
|
||||||
|
defer sp.Finish()
|
||||||
|
|
||||||
kvs := make([]string, 0, len(keys)*2)
|
kvs := make([]string, 0, len(keys)*2)
|
||||||
|
|
||||||
for idx, key := range keys {
|
for idx, key := range keys {
|
||||||
@@ -345,6 +406,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
|
|||||||
default:
|
default:
|
||||||
buf, err := r.opts.Codec.Marshal(vt)
|
buf, err := r.opts.Codec.Marshal(vt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
kvs = append(kvs, string(buf))
|
kvs = append(kvs, string(buf))
|
||||||
@@ -354,14 +416,23 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
|
|||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
|
|
||||||
cmds, err := r.cli.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
pipeliner := func(pipe redis.Pipeliner) error {
|
||||||
for idx := 0; idx < len(kvs); idx += 2 {
|
for idx := 0; idx < len(kvs); idx += 2 {
|
||||||
if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil {
|
if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var cmds []redis.Cmder
|
||||||
|
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
cmds, err = r.cli.Client.Pipelined(ctx, pipeliner)
|
||||||
|
} else {
|
||||||
|
cmds, err = r.cli.ClusterClient.Pipelined(ctx, pipeliner)
|
||||||
|
}
|
||||||
|
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
@@ -373,6 +444,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -383,6 +455,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
|
|||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
|
||||||
return store.ErrNotFound
|
return store.ErrNotFound
|
||||||
}
|
}
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -405,6 +478,10 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
|
||||||
|
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache write %v", rkey))
|
||||||
|
defer sp.Finish()
|
||||||
|
|
||||||
var buf []byte
|
var buf []byte
|
||||||
switch vt := val.(type) {
|
switch vt := val.(type) {
|
||||||
case string:
|
case string:
|
||||||
@@ -415,13 +492,19 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
|
|||||||
var err error
|
var err error
|
||||||
buf, err = r.opts.Codec.Marshal(val)
|
buf, err = r.opts.Codec.Marshal(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
err := r.cli.Set(ctx, r.getKey(r.opts.Namespace, options.Namespace, key), buf, options.TTL).Err()
|
var err error
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
err = r.cli.Client.Set(ctx, rkey, buf, options.TTL).Err()
|
||||||
|
} else {
|
||||||
|
err = r.cli.ClusterClient.Set(ctx, rkey, buf, options.TTL).Err()
|
||||||
|
}
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
||||||
@@ -432,6 +515,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -461,10 +545,27 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache list %v", rkey))
|
||||||
|
defer sp.Finish()
|
||||||
|
|
||||||
// TODO: add support for prefix/suffix/limit
|
// TODO: add support for prefix/suffix/limit
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
|
||||||
ts := time.Now()
|
ts := time.Now()
|
||||||
keys, err := r.cli.Keys(ctx, rkey).Result()
|
var keys []string
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if r.cli.Client != nil {
|
||||||
|
keys, err = r.cli.Client.Keys(ctx, rkey).Result()
|
||||||
|
} else {
|
||||||
|
err = r.cli.ClusterClient.ForEachMaster(ctx, func(nctx context.Context, cli *redis.Client) error {
|
||||||
|
nkeys, nerr := cli.Keys(nctx, rkey).Result()
|
||||||
|
if nerr != nil {
|
||||||
|
return nerr
|
||||||
|
}
|
||||||
|
keys = append(keys, nkeys...)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
te := time.Since(ts)
|
te := time.Since(ts)
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
|
||||||
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
|
||||||
@@ -475,6 +576,7 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
|
|||||||
} else if err == nil {
|
} else if err == nil {
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -563,11 +665,12 @@ func (r *Store) configure() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if redisOptions != nil {
|
if redisOptions != nil {
|
||||||
r.cli = redis.NewClient(redisOptions)
|
r.cli = &wrappedClient{Client: redis.NewClient(redisOptions)}
|
||||||
} else if redisClusterOptions != nil {
|
} else if redisClusterOptions != nil {
|
||||||
r.cli = redis.NewClusterClient(redisClusterOptions)
|
r.cli = &wrappedClient{ClusterClient: redis.NewClusterClient(redisClusterOptions)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.pool = pool.NewPool(func() *strings.Builder { return &strings.Builder{} })
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -7,14 +7,13 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"go.unistack.org/micro/v3/store"
|
"go.unistack.org/micro/v3/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_rkv_configure(t *testing.T) {
|
func Test_rkv_configure(t *testing.T) {
|
||||||
type fields struct {
|
type fields struct {
|
||||||
options store.Options
|
options store.Options
|
||||||
Client *redis.Client
|
Client *wrappedClient
|
||||||
}
|
}
|
||||||
type wantValues struct {
|
type wantValues struct {
|
||||||
username string
|
username string
|
||||||
|
Reference in New Issue
Block a user