improve metrics and tracing
Some checks failed
build / lint (push) Successful in 40s
build / test (push) Failing after 1m43s
codeql / analyze (go) (push) Failing after 3m9s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-07-04 15:13:19 +03:00
parent de72a10973
commit aed9512b93
7 changed files with 395 additions and 79 deletions

154
redis.go
View File

@@ -46,6 +46,7 @@ var (
type Store struct {
opts store.Options
cli *wrappedClient
done chan struct{}
pool pool.Pool[*strings.Builder]
}
@@ -66,7 +67,12 @@ func (r *Store) Init(opts ...store.Option) error {
o(&r.opts)
}
return r.configure()
err := r.configure()
if err != nil {
return err
}
return nil
}
func (r *Store) Client() *redis.Client {
@@ -84,10 +90,21 @@ func (r *Store) ClusterClient() *redis.ClusterClient {
}
func (r *Store) Disconnect(ctx context.Context) error {
if r.cli.Client != nil {
return r.cli.Client.Close()
var err error
select {
case <-r.done:
return err
default:
if r.cli.Client != nil {
err = r.cli.Client.Close()
} else if r.cli.ClusterClient != nil {
err = r.cli.ClusterClient.Close()
}
close(r.done)
return err
}
return r.cli.ClusterClient.Close()
return err
}
func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
@@ -108,7 +125,7 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
ctx, sp := r.opts.Tracer.Start(ctx, "cache exists "+rkey)
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
var err error
var val int64
@@ -118,17 +135,17 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
val, err = r.cli.ClusterClient.Exists(ctx, rkey).Result()
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil || (err == nil && val == 0) {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@@ -153,7 +170,7 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey)
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
var buf []byte
var err error
@@ -163,17 +180,17 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
buf, err = r.cli.ClusterClient.Get(ctx, rkey).Bytes()
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil || (err == nil && buf == nil) {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@@ -214,7 +231,7 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mread %v", keys))
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
var rvals []interface{}
var err error
@@ -224,17 +241,17 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
rvals, err = r.cli.ClusterClient.MGet(ctx, keys...).Result()
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil || (len(rvals) == 0) {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@@ -307,7 +324,7 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mdelete %v", keys))
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
var err error
if r.cli.Client != nil {
@@ -316,17 +333,17 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
err = r.cli.ClusterClient.Del(ctx, keys...).Err()
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@@ -350,7 +367,7 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache delete %v", key))
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
var err error
if r.cli.Client != nil {
@@ -359,17 +376,17 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
err = r.cli.ClusterClient.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@@ -413,7 +430,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
}
}
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
pipeliner := func(pipe redis.Pipeliner) error {
@@ -435,28 +452,28 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
for _, cmd := range cmds {
if err = cmd.Err(); err != nil {
if err == redis.Nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
}
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
}
@@ -497,7 +514,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
}
}
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
var err error
if r.cli.Client != nil {
@@ -506,17 +523,17 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
err = r.cli.ClusterClient.Set(ctx, rkey, buf, options.TTL).Err()
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@@ -549,7 +566,7 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
defer sp.Finish()
// TODO: add support for prefix/suffix/limit
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
ts := time.Now()
var keys []string
var err error
@@ -567,17 +584,17 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
})
}
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
if err == redis.Nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
return nil, store.ErrNotFound
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc()
return nil, err
}
@@ -609,7 +626,7 @@ func (r *Store) String() string {
}
func NewStore(opts ...store.Option) *Store {
return &Store{opts: store.NewOptions(opts...)}
return &Store{done: make(chan struct{}), opts: store.NewOptions(opts...)}
}
func (r *Store) configure() error {
@@ -665,12 +682,19 @@ func (r *Store) configure() error {
}
if redisOptions != nil {
r.cli = &wrappedClient{Client: redis.NewClient(redisOptions)}
c := redis.NewClient(redisOptions)
setTracing(c, r.opts.Tracer)
r.cli = &wrappedClient{Client: c}
} else if redisClusterOptions != nil {
r.cli = &wrappedClient{ClusterClient: redis.NewClusterClient(redisClusterOptions)}
c := redis.NewClusterClient(redisClusterOptions)
setTracing(c, r.opts.Tracer)
r.cli = &wrappedClient{ClusterClient: c}
}
r.pool = pool.NewPool(func() *strings.Builder { return &strings.Builder{} })
r.statsMeter()
return nil
}