From 87e2e2b947248c8af48f7312c492e0bf2bbb65cc Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 5 Oct 2024 16:11:46 +0300 Subject: [PATCH] switch to universal client Signed-off-by: Vasiliy Tolstov --- go.mod | 14 ++- go.sum | 31 +++++ options.go | 53 +++----- redis.go | 325 +++++++++++++++++++++++++++----------------------- redis_test.go | 3 +- stats.go | 17 +-- tracer.go | 38 +++--- 7 files changed, 261 insertions(+), 220 deletions(-) diff --git a/go.mod b/go.mod index 73df59d..b3d7387 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,19 @@ toolchain go1.22.4 require ( github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 - github.com/redis/go-redis/v9 v9.5.3 - go.unistack.org/micro/v3 v3.10.80 + github.com/redis/go-redis/v9 v9.6.1 + go.unistack.org/micro/v3 v3.10.95 ) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect + dario.cat/mergo v1.0.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 // indirect + go.unistack.org/micro-proto/v3 v3.4.1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect + google.golang.org/grpc v1.57.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index db7a5ae..4255ca2 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,45 @@ +dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= +dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ= github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= +github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= +go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= go.unistack.org/micro/v3 v3.10.80 h1:A0zWNoM9MOcMg9gdFFgVkgbT3uSYVIINhuvumX9nP2o= go.unistack.org/micro/v3 v3.10.80/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= +go.unistack.org/micro/v3 v3.10.94 h1:t9J+vw6rNOQdEPFPeUvhyxlUyL+LEt9dTV/bjSEaxMI= +go.unistack.org/micro/v3 v3.10.94/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= +go.unistack.org/micro/v3 v3.10.95 h1:KQse+ZpntbuzA8cH6Pz7CdvWYVdKzcR34gjnkDrHiso= +go.unistack.org/micro/v3 v3.10.95/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go index b973f8d..4e6ab6e 100644 --- a/options.go +++ b/options.go @@ -1,9 +1,7 @@ package redis import ( - "time" - - "github.com/redis/go-redis/v9" + goredis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/store" @@ -12,35 +10,34 @@ import ( type configKey struct{} -func Config(c *redis.Options) store.Option { +func Config(c *goredis.Options) store.Option { return store.SetOption(configKey{}, c) } type clusterConfigKey struct{} -func ClusterConfig(c *redis.ClusterOptions) store.Option { +func ClusterConfig(c *goredis.ClusterOptions) store.Option { return store.SetOption(clusterConfigKey{}, c) } -var ( - // DefaultMeterStatsInterval holds default stats interval - DefaultMeterStatsInterval = 5 * time.Second - // DefaultMeterMetricPrefix holds default metric prefix - DefaultMeterMetricPrefix = "micro_store_" +type universalConfigKey struct{} +func UniversalConfig(c *goredis.UniversalOptions) store.Option { + return store.SetOption(universalConfigKey{}, c) +} + +var ( labelHost = "redis_host" labelName = "redis_name" ) // Options struct holds wrapper options type Options struct { - Logger logger.Logger - Meter meter.Meter - Tracer tracer.Tracer - MeterMetricPrefix string - MeterStatsInterval time.Duration - RedisHost string - RedisName string + Logger logger.Logger + Meter meter.Meter + Tracer tracer.Tracer + RedisHost string + RedisName string } // Option func signature @@ -49,11 +46,9 @@ type Option func(*Options) // NewOptions create new Options struct from provided option slice func NewOptions(opts ...Option) Options { options := Options{ - Logger: logger.DefaultLogger, - Meter: meter.DefaultMeter, - Tracer: tracer.DefaultTracer, - MeterStatsInterval: DefaultMeterStatsInterval, - MeterMetricPrefix: DefaultMeterMetricPrefix, + Logger: logger.DefaultLogger, + Meter: meter.DefaultMeter, + Tracer: tracer.DefaultTracer, } for _, o := range opts { @@ -70,17 +65,3 @@ func NewOptions(opts ...Option) Options { return options } - -// MetricInterval specifies stats interval for *sql.DB -func MetricInterval(td time.Duration) Option { - return func(o *Options) { - o.MeterStatsInterval = td - } -} - -// MetricPrefix specifies prefix for each metric -func MetricPrefix(pref string) Option { - return func(o *Options) { - o.MeterMetricPrefix = pref - } -} diff --git a/redis.go b/redis.go index de45f47..d8e7946 100755 --- a/redis.go +++ b/redis.go @@ -2,12 +2,11 @@ package redis import ( "context" - "fmt" "reflect" "strings" "time" - redis "github.com/redis/go-redis/v9" + goredis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/store" pool "go.unistack.org/micro/v3/util/xpool" @@ -16,7 +15,7 @@ import ( var ( DefaultPathSeparator = "/" - DefaultClusterOptions = &redis.ClusterOptions{ + DefaultUniversalOptions = &goredis.UniversalOptions{ Username: "", Password: "", // no password set MaxRetries: 2, @@ -28,7 +27,19 @@ var ( MinIdleConns: 10, } - DefaultOptions = &redis.Options{ + DefaultClusterOptions = &goredis.ClusterOptions{ + Username: "", + Password: "", // no password set + MaxRetries: 2, + MaxRetryBackoff: 256 * time.Millisecond, + DialTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + PoolTimeout: 1 * time.Second, + MinIdleConns: 10, + } + + DefaultOptions = &goredis.Options{ Username: "", Password: "", // no password set DB: 0, // use default DB @@ -44,22 +55,16 @@ var ( type Store struct { opts store.Options - cli *wrappedClient + cli goredis.UniversalClient done chan struct{} - pool pool.Pool[*strings.Builder] -} - -type wrappedClient struct { - *redis.Client - *redis.ClusterClient + pool *pool.StringsPool } func (r *Store) Connect(ctx context.Context) error { - var err error - if r.cli.Client != nil { - err = r.cli.Client.Ping(ctx).Err() + if r.cli == nil { + return store.ErrNotConnected } - err = r.cli.ClusterClient.Ping(ctx).Err() + err := r.cli.Ping(ctx).Err() setSpanError(ctx, err) return err } @@ -77,16 +82,20 @@ func (r *Store) Init(opts ...store.Option) error { return nil } -func (r *Store) Client() *redis.Client { - if r.cli.Client != nil { - return r.cli.Client +func (r *Store) Client() *goredis.Client { + if c, ok := r.cli.(*goredis.Client); ok { + return c } return nil } -func (r *Store) ClusterClient() *redis.ClusterClient { - if r.cli.ClusterClient != nil { - return r.cli.ClusterClient +func (r *Store) UniversalClient() goredis.UniversalClient { + return r.cli +} + +func (r *Store) ClusterClient() *goredis.ClusterClient { + if c, ok := r.cli.(*goredis.ClusterClient); ok { + return c } return nil } @@ -97,10 +106,8 @@ func (r *Store) Disconnect(ctx context.Context) error { case <-r.done: return err default: - if r.cli.Client != nil { - err = r.cli.Client.Close() - } else if r.cli.ClusterClient != nil { - err = r.cli.ClusterClient.Close() + if r.cli != nil { + err = r.cli.Close() } close(r.done) return err @@ -108,6 +115,8 @@ func (r *Store) Disconnect(ctx context.Context) error { } func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error { + b := r.pool.Get() + defer r.pool.Put(b) options := store.NewExistsOptions(opts...) timeout := r.opts.Timeout @@ -121,23 +130,17 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti defer cancel() } - rkey := r.getKey(r.opts.Namespace, options.Namespace, key) + rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() ts := time.Now() - var err error - var val int64 - if r.cli.Client != nil { - val, err = r.cli.Client.Exists(ctx, rkey).Result() - } else { - val, err = r.cli.ClusterClient.Exists(ctx, rkey).Result() - } + val, err := r.cli.Exists(ctx, rkey).Result() setSpanError(ctx, err) te := time.Since(ts) r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil || (err == nil && val == 0) { + if err == goredis.Nil || (err == nil && val == 0) { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } else if err == nil { @@ -151,6 +154,9 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti } func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { + b := r.pool.Get() + defer r.pool.Put(b) + options := store.NewReadOptions(opts...) timeout := r.opts.Timeout @@ -164,23 +170,17 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s defer cancel() } - rkey := r.getKey(r.opts.Namespace, options.Namespace, key) + rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() ts := time.Now() - var buf []byte - var err error - if r.cli.Client != nil { - buf, err = r.cli.Client.Get(ctx, rkey).Bytes() - } else { - buf, err = r.cli.ClusterClient.Get(ctx, rkey).Bytes() - } + buf, err := r.cli.Get(ctx, rkey).Bytes() setSpanError(ctx, err) te := time.Since(ts) r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil || (err == nil && buf == nil) { + if err == goredis.Nil || (err == nil && buf == nil) { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } else if err == nil { @@ -219,10 +219,14 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts } var rkeys []string + var pools []*strings.Builder if r.opts.Namespace != "" || options.Namespace != "" { rkeys = make([]string, len(keys)) + pools = make([]*strings.Builder, len(keys)) for idx, key := range keys { - rkeys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key) + b := r.pool.Get() + pools[idx] = b + rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key) } } @@ -231,24 +235,19 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts var rvals []interface{} var err error if r.opts.Namespace != "" || options.Namespace != "" { - if r.cli.Client != nil { - rvals, err = r.cli.Client.MGet(ctx, rkeys...).Result() - } else { - rvals, err = r.cli.ClusterClient.MGet(ctx, rkeys...).Result() + rvals, err = r.cli.MGet(ctx, rkeys...).Result() + for idx := range pools { + r.pool.Put(pools[idx]) } } else { - if r.cli.Client != nil { - rvals, err = r.cli.Client.MGet(ctx, keys...).Result() - } else { - rvals, err = r.cli.ClusterClient.MGet(ctx, keys...).Result() - } + rvals, err = r.cli.MGet(ctx, keys...).Result() } setSpanError(ctx, err) te := time.Since(ts) r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil || (len(rvals) == 0) { + if err == goredis.Nil || (len(rvals) == 0) { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } else if err == nil { @@ -319,10 +318,14 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete } var rkeys []string + var pools []*strings.Builder if r.opts.Namespace != "" || options.Namespace != "" { rkeys = make([]string, len(keys)) + pools = make([]*strings.Builder, len(keys)) for idx, key := range keys { - rkeys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key) + b := r.pool.Get() + pools[idx] = b + rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key) } } @@ -330,24 +333,19 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete ts := time.Now() var err error if r.opts.Namespace != "" || options.Namespace != "" { - if r.cli.Client != nil { - err = r.cli.Client.Del(ctx, rkeys...).Err() - } else { - err = r.cli.ClusterClient.Del(ctx, rkeys...).Err() + err = r.cli.Del(ctx, rkeys...).Err() + for idx := range pools { + r.pool.Put(pools[idx]) } } else { - if r.cli.Client != nil { - err = r.cli.Client.Del(ctx, keys...).Err() - } else { - err = r.cli.ClusterClient.Del(ctx, keys...).Err() - } + err = r.cli.Del(ctx, keys...).Err() } setSpanError(ctx, err) te := time.Since(ts) r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil { + if err == goredis.Nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } else if err == nil { @@ -361,6 +359,9 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete } func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error { + b := r.pool.Get() + defer r.pool.Put(b) + options := store.NewDeleteOptions(opts...) timeout := r.opts.Timeout @@ -376,18 +377,13 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() ts := time.Now() - var err error - if r.cli.Client != nil { - err = r.cli.Client.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err() - } else { - err = r.cli.ClusterClient.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err() - } - setSpanError(ctx, err) + err := r.cli.Del(ctx, r.getKey(b, r.opts.Namespace, options.Namespace, key)).Err() te := time.Since(ts) + setSpanError(ctx, err) r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil { + if err == goredis.Nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } else if err == nil { @@ -415,9 +411,11 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } kvs := make([]string, 0, len(keys)*2) - + pools := make([]*strings.Builder, len(keys)) for idx, key := range keys { - kvs = append(kvs, r.getKey(r.opts.Namespace, options.Namespace, key)) + b := r.pool.Get() + pools[idx] = b + kvs = append(kvs, r.getKey(b, r.opts.Namespace, options.Namespace, key)) switch vt := vals[idx].(type) { case string: @@ -434,9 +432,8 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() - ts := time.Now() - pipeliner := func(pipe redis.Pipeliner) error { + pipeliner := func(pipe goredis.Pipeliner) error { for idx := 0; idx < len(kvs); idx += 2 { if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil { setSpanError(ctx, err) @@ -446,20 +443,18 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o return nil } - var err error - var cmds []redis.Cmder - - if r.cli.Client != nil { - cmds, err = r.cli.Client.Pipelined(ctx, pipeliner) - } else { - cmds, err = r.cli.ClusterClient.Pipelined(ctx, pipeliner) + ts := time.Now() + cmds, err := r.cli.Pipelined(ctx, pipeliner) + for idx := range pools { + r.pool.Put(pools[idx]) } - setSpanError(ctx, err) te := time.Since(ts) + setSpanError(ctx, err) + r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil { + if err == goredis.Nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } else if err == nil { @@ -471,7 +466,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o for _, cmd := range cmds { if err = cmd.Err(); err != nil { - if err == redis.Nil { + if err == goredis.Nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } @@ -485,6 +480,9 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { + b := r.pool.Get() + defer r.pool.Put(b) + options := store.NewWriteOptions(opts...) timeout := r.opts.Timeout @@ -498,7 +496,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... defer cancel() } - rkey := r.getKey(r.opts.Namespace, options.Namespace, key) + rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) var buf []byte switch vt := val.(type) { @@ -516,18 +514,14 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() ts := time.Now() - var err error - if r.cli.Client != nil { - err = r.cli.Client.Set(ctx, rkey, buf, options.TTL).Err() - } else { - err = r.cli.ClusterClient.Set(ctx, rkey, buf, options.TTL).Err() - } - setSpanError(ctx, err) + err := r.cli.Set(ctx, rkey, buf, options.TTL).Err() te := time.Since(ts) + setSpanError(ctx, err) + r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil { + if err == goredis.Nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound } else if err == nil { @@ -541,12 +535,15 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... } func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) { + b := r.pool.Get() + defer r.pool.Put(b) + options := store.NewListOptions(opts...) if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } - rkey := r.getKey(options.Namespace, "", options.Prefix+"*") + rkey := r.getKey(b, options.Namespace, "", options.Prefix+"*") if options.Suffix != "" { rkey += options.Suffix } @@ -568,10 +565,8 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e var keys []string var err error - if r.cli.Client != nil { - keys, err = r.cli.Client.Keys(ctx, rkey).Result() - } else { - err = r.cli.ClusterClient.ForEachMaster(ctx, func(nctx context.Context, cli *redis.Client) error { + if c, ok := r.cli.(*goredis.ClusterClient); ok { + err = c.ForEachMaster(ctx, func(nctx context.Context, cli *goredis.Client) error { nkeys, nerr := cli.Keys(nctx, rkey).Result() if nerr != nil { return nerr @@ -579,13 +574,16 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e keys = append(keys, nkeys...) return nil }) + } else { + keys, err = r.cli.Keys(ctx, rkey).Result() } - setSpanError(ctx, err) te := time.Since(ts) + setSpanError(ctx, err) + r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) - if err == redis.Nil { + if err == goredis.Nil { r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return nil, store.ErrNotFound } else if err == nil { @@ -627,79 +625,106 @@ func NewStore(opts ...store.Option) *Store { } func (r *Store) configure() error { - var redisOptions *redis.Options - var redisClusterOptions *redis.ClusterOptions - var err error - - nodes := r.opts.Addrs - - if len(nodes) == 0 { - nodes = []string{"redis://127.0.0.1:6379"} - } + var universalOptions *goredis.UniversalOptions if r.cli != nil && r.opts.Context == nil { return nil } if r.opts.Context != nil { - if c, ok := r.opts.Context.Value(configKey{}).(*redis.Options); ok { - redisOptions = c + if o, ok := r.opts.Context.Value(configKey{}).(*goredis.Options); ok { + universalOptions.Addrs = []string{o.Addr} + universalOptions.Dialer = o.Dialer + universalOptions.OnConnect = o.OnConnect + universalOptions.Username = o.Username + universalOptions.Password = o.Password + + universalOptions.MaxRetries = o.MaxRetries + universalOptions.MinRetryBackoff = o.MinRetryBackoff + universalOptions.MaxRetryBackoff = o.MaxRetryBackoff + + universalOptions.DialTimeout = o.DialTimeout + universalOptions.ReadTimeout = o.ReadTimeout + universalOptions.WriteTimeout = o.WriteTimeout + universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled + + universalOptions.PoolFIFO = o.PoolFIFO + + universalOptions.PoolSize = o.PoolSize + universalOptions.PoolTimeout = o.PoolTimeout + universalOptions.MinIdleConns = o.MinIdleConns + universalOptions.MaxIdleConns = o.MaxIdleConns + universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime + universalOptions.ConnMaxLifetime = o.ConnMaxLifetime + if r.opts.TLSConfig != nil { - redisOptions.TLSConfig = r.opts.TLSConfig + universalOptions.TLSConfig = r.opts.TLSConfig } } - if c, ok := r.opts.Context.Value(clusterConfigKey{}).(*redis.ClusterOptions); ok { - redisClusterOptions = c + if o, ok := r.opts.Context.Value(clusterConfigKey{}).(*goredis.ClusterOptions); ok { + universalOptions.Addrs = o.Addrs + universalOptions.Dialer = o.Dialer + universalOptions.OnConnect = o.OnConnect + universalOptions.Username = o.Username + universalOptions.Password = o.Password + + universalOptions.MaxRedirects = o.MaxRedirects + universalOptions.ReadOnly = o.ReadOnly + universalOptions.RouteByLatency = o.RouteByLatency + universalOptions.RouteRandomly = o.RouteRandomly + + universalOptions.MaxRetries = o.MaxRetries + universalOptions.MinRetryBackoff = o.MinRetryBackoff + universalOptions.MaxRetryBackoff = o.MaxRetryBackoff + + universalOptions.DialTimeout = o.DialTimeout + universalOptions.ReadTimeout = o.ReadTimeout + universalOptions.WriteTimeout = o.WriteTimeout + universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled + + universalOptions.PoolFIFO = o.PoolFIFO + + universalOptions.PoolSize = o.PoolSize + universalOptions.PoolTimeout = o.PoolTimeout + universalOptions.MinIdleConns = o.MinIdleConns + universalOptions.MaxIdleConns = o.MaxIdleConns + universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime + universalOptions.ConnMaxLifetime = o.ConnMaxLifetime if r.opts.TLSConfig != nil { - redisClusterOptions.TLSConfig = r.opts.TLSConfig + universalOptions.TLSConfig = r.opts.TLSConfig + } + } + + if o, ok := r.opts.Context.Value(universalConfigKey{}).(*goredis.UniversalOptions); ok { + universalOptions = o + if r.opts.TLSConfig != nil { + universalOptions.TLSConfig = r.opts.TLSConfig } } } - if redisOptions != nil && redisClusterOptions != nil { - return fmt.Errorf("must specify only one option Config or ClusterConfig") + if universalOptions == nil { + universalOptions = DefaultUniversalOptions } - if redisOptions == nil && redisClusterOptions == nil && r.cli != nil { - return nil + if len(r.opts.Addrs) > 0 { + universalOptions.Addrs = r.opts.Addrs + } else { + universalOptions.Addrs = []string{"redis://127.0.0.1:6379"} } - if redisOptions == nil && redisClusterOptions == nil && len(nodes) == 1 { - redisOptions, err = redis.ParseURL(nodes[0]) - if err != nil { - redisOptions = DefaultOptions - redisOptions.Addr = r.opts.Addrs[0] - redisOptions.TLSConfig = r.opts.TLSConfig - } - } else if redisOptions == nil && redisClusterOptions == nil && len(nodes) > 1 { - redisClusterOptions = DefaultClusterOptions - redisClusterOptions.Addrs = r.opts.Addrs - redisClusterOptions.TLSConfig = r.opts.TLSConfig - } + r.cli = goredis.NewUniversalClient(universalOptions) + setTracing(r.cli, r.opts.Tracer) - if redisOptions != nil { - c := redis.NewClient(redisOptions) - setTracing(c, r.opts.Tracer) - r.cli = &wrappedClient{Client: c} - } else if redisClusterOptions != nil { - c := redis.NewClusterClient(redisClusterOptions) - setTracing(c, r.opts.Tracer) - r.cli = &wrappedClient{ClusterClient: c} - } - - r.pool = pool.NewPool(func() *strings.Builder { return &strings.Builder{} }) + r.pool = pool.NewStringsPool(50) r.statsMeter() return nil } -func (r *Store) getKey(mainNamespace string, opNamespace string, key string) string { - b := r.pool.Get() - defer r.pool.Put(b) - b.Reset() - +func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace string, key string) string { if opNamespace == "" { opNamespace = mainNamespace } diff --git a/redis_test.go b/redis_test.go index 0bd9a24..4fa7a20 100755 --- a/redis_test.go +++ b/redis_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + goredis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v3/store" "go.unistack.org/micro/v3/tracer" ) @@ -41,7 +42,7 @@ func TestKeepTTL(t *testing.T) { func Test_rkv_configure(t *testing.T) { type fields struct { options store.Options - Client *wrappedClient + Client goredis.UniversalClient } type wantValues struct { username string diff --git a/stats.go b/stats.go index fd07b20..3aeba4c 100644 --- a/stats.go +++ b/stats.go @@ -3,7 +3,8 @@ package redis import ( "time" - "github.com/redis/go-redis/v9" + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/meter" ) var ( @@ -13,29 +14,23 @@ var ( PoolConnTotalCurrent = "pool_conn_total_current" PoolConnIdleCurrent = "pool_conn_idle_current" PoolConnStaleTotal = "pool_conn_stale_total" - - meterRequestTotal = "request_total" - meterRequestLatencyMicroseconds = "latency_microseconds" - meterRequestDurationSeconds = "request_duration_seconds" ) type Statser interface { - PoolStats() *redis.PoolStats + PoolStats() *goredis.PoolStats } func (r *Store) statsMeter() { var st Statser - if r.cli.Client != nil { - st = r.cli.Client - } else if r.cli.ClusterClient != nil { - st = r.cli.ClusterClient + if r.cli != nil { + st = r.cli } else { return } go func() { - ticker := time.NewTicker(DefaultMeterStatsInterval) + ticker := time.NewTicker(meter.DefaultMeterStatsInterval) defer ticker.Stop() for { diff --git a/tracer.go b/tracer.go index bddbcaa..9df06fe 100644 --- a/tracer.go +++ b/tracer.go @@ -7,24 +7,24 @@ import ( "strconv" rediscmd "github.com/redis/go-redis/extra/rediscmd/v9" - "github.com/redis/go-redis/v9" + goredis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v3/tracer" ) -func setTracing(rdb redis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) { +func setTracing(rdb goredis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) { switch rdb := rdb.(type) { - case *redis.Client: + case *goredis.Client: opt := rdb.Options() connString := formatDBConnString(opt.Network, opt.Addr) rdb.AddHook(newTracingHook(connString, tr)) - case *redis.ClusterClient: - rdb.OnNewNode(func(rdb *redis.Client) { + case *goredis.ClusterClient: + rdb.OnNewNode(func(rdb *goredis.Client) { opt := rdb.Options() connString := formatDBConnString(opt.Network, opt.Addr) rdb.AddHook(newTracingHook(connString, tr)) }) - case *redis.Ring: - rdb.OnNewNode(func(rdb *redis.Client) { + case *goredis.Ring: + rdb.OnNewNode(func(rdb *goredis.Client) { opt := rdb.Options() connString := formatDBConnString(opt.Network, opt.Addr) rdb.AddHook(newTracingHook(connString, tr)) @@ -37,7 +37,7 @@ type tracingHook struct { opts []tracer.SpanOption } -var _ redis.Hook = (*tracingHook)(nil) +var _ goredis.Hook = (*tracingHook)(nil) func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook { opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient)) @@ -51,10 +51,10 @@ func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOpti } } -func (h *tracingHook) DialHook(hook redis.DialHook) redis.DialHook { +func (h *tracingHook) DialHook(hook goredis.DialHook) goredis.DialHook { return func(ctx context.Context, network, addr string) (net.Conn, error) { /* - _, span := h.tr.Start(ctx, "redis.dial", h.opts...) + _, span := h.tr.Start(ctx, "goredis.dial", h.opts...) defer span.Finish() */ conn, err := hook(ctx, network, addr) @@ -64,8 +64,8 @@ func (h *tracingHook) DialHook(hook redis.DialHook) redis.DialHook { } } -func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook { - return func(ctx context.Context, cmd redis.Cmder) error { +func (h *tracingHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { + return func(ctx context.Context, cmd goredis.Cmder) error { cmdString := rediscmd.CmdString(cmd) var err error @@ -73,7 +73,7 @@ func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook { case "cluster slots": break default: - _, span := h.tr.Start(ctx, "redis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) + _, span := h.tr.Start(ctx, "goredis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) defer func() { recordError(span, err) span.Finish() @@ -86,16 +86,16 @@ func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook { } } -func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook { - return func(ctx context.Context, cmds []redis.Cmder) error { +func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook { + return func(ctx context.Context, cmds []goredis.Cmder) error { _, cmdsString := rediscmd.CmdsString(cmds) opts := append(h.opts, tracer.WithSpanLabels( - "db.redis.num_cmd", strconv.Itoa(len(cmds)), + "db.goredis.num_cmd", strconv.Itoa(len(cmds)), "db.statement", cmdsString, )) - _, span := h.tr.Start(ctx, "redis.process_pipeline", opts...) + _, span := h.tr.Start(ctx, "goredis.process_pipeline", opts...) defer span.Finish() err := hook(ctx, cmds) @@ -106,7 +106,7 @@ func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis. } func setSpanError(ctx context.Context, err error) { - if err == nil || err == redis.Nil { + if err == nil || err == goredis.Nil { return } if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil { @@ -115,7 +115,7 @@ func setSpanError(ctx context.Context, err error) { } func recordError(span tracer.Span, err error) { - if err != nil && err != redis.Nil { + if err != nil && err != goredis.Nil { span.SetStatus(tracer.SpanStatusError, err.Error()) } }