add tracer support
Some checks failed
build / test (push) Failing after 1m32s
codeql / analyze (go) (push) Failing after 1m35s
build / lint (push) Successful in 9m14s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-04-14 22:40:30 +03:00
parent 1e8a44b088
commit 741b2310ec

View File

@ -10,6 +10,7 @@ import (
redis "github.com/redis/go-redis/v9"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/store"
"go.unistack.org/micro/v3/tracer"
pool "go.unistack.org/micro/v3/util/xpool"
)
@ -97,7 +98,7 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
}
rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey)
ctx, sp := r.opts.Tracer.Start(ctx, "cache exists "+rkey)
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
@ -113,6 +114,7 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -134,9 +136,13 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
defer cancel()
}
rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey)
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
ts := time.Now()
buf, err := r.cli.Get(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Bytes()
buf, err := r.cli.Get(ctx, rkey).Bytes()
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
@ -147,6 +153,7 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -157,7 +164,9 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
case *string:
*b = string(buf)
default:
err = r.opts.Codec.Unmarshal(buf, val)
if err = r.opts.Codec.Unmarshal(buf, val); err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
}
return err
@ -183,6 +192,9 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
}
}
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mread %v", keys))
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
ts := time.Now()
rvals, err := r.cli.MGet(ctx, keys...).Result()
@ -196,6 +208,7 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -237,10 +250,12 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
itm.Set(reflect.New(vt.Elem()))
if err = r.opts.Codec.Unmarshal(buf, itm.Interface()); err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
return err
}
}
vv.Set(nvv)
return nil
}
@ -264,6 +279,9 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
}
}
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mdelete %v", keys))
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
ts := time.Now()
err := r.cli.Del(ctx, keys...).Err()
@ -277,6 +295,7 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -298,6 +317,9 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
defer cancel()
}
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache delete %v", key))
defer sp.Finish()
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
ts := time.Now()
err := r.cli.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
@ -311,6 +333,7 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -332,6 +355,9 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
defer cancel()
}
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mwrite %v", keys))
defer sp.Finish()
kvs := make([]string, 0, len(keys)*2)
for idx, key := range keys {
@ -345,6 +371,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
default:
buf, err := r.opts.Codec.Marshal(vt)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
return err
}
kvs = append(kvs, string(buf))
@ -373,6 +400,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -383,6 +411,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc()
return store.ErrNotFound
}
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -405,6 +434,10 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
defer cancel()
}
rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache write %v", rkey))
defer sp.Finish()
var buf []byte
switch vt := val.(type) {
case string:
@ -415,13 +448,14 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
var err error
buf, err = r.opts.Codec.Marshal(val)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
return err
}
}
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
ts := time.Now()
err := r.cli.Set(ctx, r.getKey(r.opts.Namespace, options.Namespace, key), buf, options.TTL).Err()
err := r.cli.Set(ctx, rkey, buf, options.TTL).Err()
te := time.Since(ts)
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec()
r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
@ -432,6 +466,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return err
}
@ -461,6 +496,9 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
defer cancel()
}
ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache list %v", rkey))
defer sp.Finish()
// TODO: add support for prefix/suffix/limit
r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc()
ts := time.Now()
@ -475,6 +513,7 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
} else if err == nil {
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc()
} else if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc()
return nil, err
}