From 741b2310ecb967f032900bd6df011704a91ee11e Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 14 Apr 2024 22:40:30 +0300 Subject: [PATCH] add tracer support Signed-off-by: Vasiliy Tolstov --- redis.go | 47 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/redis.go b/redis.go index 55e6f96..ca5077d 100755 --- a/redis.go +++ b/redis.go @@ -10,6 +10,7 @@ import ( redis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/store" + "go.unistack.org/micro/v3/tracer" pool "go.unistack.org/micro/v3/util/xpool" ) @@ -97,7 +98,7 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti } rkey := r.getKey(r.opts.Namespace, options.Namespace, key) - ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey) + ctx, sp := r.opts.Tracer.Start(ctx, "cache exists "+rkey) defer sp.Finish() r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() @@ -113,6 +114,7 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -134,9 +136,13 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s defer cancel() } + rkey := r.getKey(r.opts.Namespace, options.Namespace, key) + ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey) + defer sp.Finish() + r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() ts := time.Now() - buf, err := r.cli.Get(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Bytes() + buf, err := r.cli.Get(ctx, rkey).Bytes() te := time.Since(ts) r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) @@ -147,6 +153,7 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -157,7 +164,9 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s case *string: *b = string(buf) default: - err = r.opts.Codec.Unmarshal(buf, val) + if err = r.opts.Codec.Unmarshal(buf, val); err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } } return err @@ -183,6 +192,9 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts } } + ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mread %v", keys)) + defer sp.Finish() + r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() ts := time.Now() rvals, err := r.cli.MGet(ctx, keys...).Result() @@ -196,6 +208,7 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -237,10 +250,12 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts itm.Set(reflect.New(vt.Elem())) if err = r.opts.Codec.Unmarshal(buf, itm.Interface()); err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) return err } } vv.Set(nvv) + return nil } @@ -264,6 +279,9 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete } } + ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mdelete %v", keys)) + defer sp.Finish() + r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() ts := time.Now() err := r.cli.Del(ctx, keys...).Err() @@ -277,6 +295,7 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -298,6 +317,9 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti defer cancel() } + ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache delete %v", key)) + defer sp.Finish() + r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() ts := time.Now() err := r.cli.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err() @@ -311,6 +333,7 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -332,6 +355,9 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o defer cancel() } + ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mwrite %v", keys)) + defer sp.Finish() + kvs := make([]string, 0, len(keys)*2) for idx, key := range keys { @@ -345,6 +371,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o default: buf, err := r.opts.Codec.Marshal(vt) if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) return err } kvs = append(kvs, string(buf)) @@ -373,6 +400,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -383,6 +411,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -405,6 +434,10 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... defer cancel() } + rkey := r.getKey(r.opts.Namespace, options.Namespace, key) + ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache write %v", rkey)) + defer sp.Finish() + var buf []byte switch vt := val.(type) { case string: @@ -415,13 +448,14 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... var err error buf, err = r.opts.Codec.Marshal(val) if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) return err } } r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() ts := time.Now() - err := r.cli.Set(ctx, r.getKey(r.opts.Namespace, options.Namespace, key), buf, options.TTL).Err() + err := r.cli.Set(ctx, rkey, buf, options.TTL).Err() te := time.Since(ts) r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) @@ -432,6 +466,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return err } @@ -461,6 +496,9 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e defer cancel() } + ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache list %v", rkey)) + defer sp.Finish() + // TODO: add support for prefix/suffix/limit r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() ts := time.Now() @@ -475,6 +513,7 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e } else if err == nil { r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() } else if err != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() return nil, err }