diff --git a/redis.go b/redis.go index 1ca10e0..e561e88 100755 --- a/redis.go +++ b/redis.go @@ -2,6 +2,7 @@ package redis import ( "context" + "errors" "reflect" "strings" "sync/atomic" @@ -126,6 +127,8 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti b := r.pool.Get() defer r.pool.Put(b) options := store.NewExistsOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "exists") timeout := r.opts.Timeout if options.Timeout > 0 { @@ -140,21 +143,22 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() ts := time.Now() val, err := r.cli.Exists(ctx, rkey).Result() setSpanError(ctx, err) te := time.Since(ts) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() - r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) - r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == goredis.Nil || (err == nil && val == 0) { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) || (err == nil && val == 0) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound } else if err == nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() } else { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } @@ -166,6 +170,8 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s defer r.pool.Put(b) options := store.NewReadOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "read") timeout := r.opts.Timeout if options.Timeout > 0 { @@ -180,21 +186,21 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() ts := time.Now() buf, err := r.cli.Get(ctx, rkey).Bytes() setSpanError(ctx, err) te := time.Since(ts) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() - r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) - r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == goredis.Nil || (err == nil && buf == nil) { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) || (err == nil && buf == nil) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound } else if err == nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() } else if err != nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } @@ -313,6 +319,8 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.DeleteOption) error { options := store.NewDeleteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "delete") timeout := r.opts.Timeout if options.Timeout > 0 { @@ -337,7 +345,7 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete } } - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() ts := time.Now() var err error if r.opts.Namespace != "" || options.Namespace != "" { @@ -350,16 +358,16 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete } setSpanError(ctx, err) te := time.Since(ts) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() - r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) - r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) if err == goredis.Nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound } else if err == nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() } else if err != nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } @@ -371,6 +379,8 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti defer r.pool.Put(b) options := store.NewDeleteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "delete") timeout := r.opts.Timeout if options.Timeout > 0 { @@ -383,21 +393,22 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti defer cancel() } - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() ts := time.Now() err := r.cli.Del(ctx, r.getKey(b, r.opts.Namespace, options.Namespace, key)).Err() te := time.Since(ts) setSpanError(ctx, err) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() - r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) - r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == goredis.Nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound } else if err == nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() } else if err != nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } @@ -406,6 +417,8 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, opts ...store.WriteOption) error { options := store.NewWriteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "write") timeout := r.opts.Timeout if options.Timeout > 0 { @@ -439,7 +452,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } } - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() pipeliner := func(pipe goredis.Pipeliner) error { for idx := 0; idx < len(kvs); idx += 2 { @@ -459,27 +472,27 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o te := time.Since(ts) setSpanError(ctx, err) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() - r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) - r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) if err == goredis.Nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound } else if err == nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() } else if err != nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } for _, cmd := range cmds { if err = cmd.Err(); err != nil { if err == goredis.Nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound } setSpanError(ctx, err) - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } } @@ -492,6 +505,8 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... defer r.pool.Put(b) options := store.NewWriteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "write") timeout := r.opts.Timeout if options.Timeout > 0 { @@ -520,22 +535,22 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... } } - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() ts := time.Now() err := r.cli.Set(ctx, rkey, buf, options.TTL).Err() te := time.Since(ts) setSpanError(ctx, err) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() - r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) - r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == goredis.Nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound } else if err == nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() } else { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } @@ -547,6 +562,9 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e defer r.pool.Put(b) options := store.NewListOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "list") + if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } @@ -568,7 +586,7 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e } // TODO: add support for prefix/suffix/limit - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() ts := time.Now() var keys []string var err error @@ -588,16 +606,16 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e te := time.Since(ts) setSpanError(ctx, err) - r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() - r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) - r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) if err == goredis.Nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return nil, store.ErrNotFound } else if err == nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "git")...).Inc() } else if err != nil { - r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return nil, err } diff --git a/redis_test.go b/redis_test.go index b5a3ae2..3632ef9 100755 --- a/redis_test.go +++ b/redis_test.go @@ -13,6 +13,26 @@ import ( "go.unistack.org/micro/v3/tracer" ) +func TestLazyConnect(t *testing.T) { + ctx := context.Background() + var err error + + r := NewStore() + + if err = r.Init(); err != nil { + t.Fatal(err) + } + if err = r.Connect(ctx); err != nil { + t.Logf("connect failed %v", err) + } + + for { + if err = r.Write(ctx, "mykey", "myval"); err != nil { + t.Logf("failed to write %v", err) + } + } +} + func TestKeepTTL(t *testing.T) { ctx := context.Background() diff --git a/tracer.go b/tracer.go index 4936600..a038dab 100644 --- a/tracer.go +++ b/tracer.go @@ -91,7 +91,7 @@ func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) gore _, cmdsString := rediscmd.CmdsString(cmds) opts := append(h.opts, tracer.WithSpanLabels( - "db.goredis.num_cmd", strconv.Itoa(len(cmds)), + "db.database.num_cmd", strconv.Itoa(len(cmds)), "db.statement", cmdsString, ))