Compare commits
	
		
			17 Commits
		
	
	
		
			v3.10.11
			...
			9c3d0a6a6e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 9c3d0a6a6e | |||
| 15d7ad156f | |||
| 0981f89f60 | |||
| 332fe5f4d4 | |||
| 757fe0245b | |||
| 27eccc1ed2 | |||
| 7c641fa8ac | |||
| 24c9f20196 | |||
| 953b5b0021 | |||
| 87e2e2b947 | |||
| 256e61a437 | |||
| f9cdd41c94 | |||
| ecad15fe17 | |||
| fa3d18b353 | |||
| 2f3951773f | |||
| b263e14032 | |||
| 518cc1db73 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@@ -0,0 +1 @@
 | 
			
		||||
.idea
 | 
			
		||||
							
								
								
									
										13
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								go.mod
									
									
									
									
									
								
							@@ -1,16 +1,19 @@
 | 
			
		||||
module go.unistack.org/micro-store-redis/v3
 | 
			
		||||
 | 
			
		||||
go 1.21
 | 
			
		||||
go 1.22
 | 
			
		||||
 | 
			
		||||
toolchain go1.22.4
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3
 | 
			
		||||
	github.com/redis/go-redis/v9 v9.5.3
 | 
			
		||||
	go.unistack.org/micro/v3 v3.10.80
 | 
			
		||||
	github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0
 | 
			
		||||
	github.com/redis/go-redis/v9 v9.7.0
 | 
			
		||||
	go.unistack.org/micro/v3 v3.10.105
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 | 
			
		||||
	github.com/cespare/xxhash/v2 v2.3.0 // indirect
 | 
			
		||||
	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 | 
			
		||||
	github.com/google/go-cmp v0.6.0 // indirect
 | 
			
		||||
	go.unistack.org/micro-proto/v3 v3.4.1 // indirect
 | 
			
		||||
	google.golang.org/protobuf v1.35.2 // indirect
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										22
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								go.sum
									
									
									
									
									
								
							@@ -2,13 +2,19 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
 | 
			
		||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
 | 
			
		||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
 | 
			
		||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
 | 
			
		||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 | 
			
		||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
 | 
			
		||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 | 
			
		||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20=
 | 
			
		||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ=
 | 
			
		||||
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
 | 
			
		||||
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.80 h1:A0zWNoM9MOcMg9gdFFgVkgbT3uSYVIINhuvumX9nP2o=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.80/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
 | 
			
		||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
 | 
			
		||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 | 
			
		||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk=
 | 
			
		||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0/go.mod h1:eTg/YQtGYAZD5r3DlGlJptJ45AHA+/G+2NPn30PKzik=
 | 
			
		||||
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
 | 
			
		||||
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
 | 
			
		||||
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
 | 
			
		||||
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.105 h1:JYNV0d+fnR7Hy8d4/sjr+25DbSNqq1Z7IPeDDdB+f1I=
 | 
			
		||||
go.unistack.org/micro/v3 v3.10.105/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g=
 | 
			
		||||
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
 | 
			
		||||
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										58
									
								
								options.go
									
									
									
									
									
								
							
							
						
						
									
										58
									
								
								options.go
									
									
									
									
									
								
							@@ -1,9 +1,7 @@
 | 
			
		||||
package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	goredis "github.com/redis/go-redis/v9"
 | 
			
		||||
	"go.unistack.org/micro/v3/logger"
 | 
			
		||||
	"go.unistack.org/micro/v3/meter"
 | 
			
		||||
	"go.unistack.org/micro/v3/store"
 | 
			
		||||
@@ -12,30 +10,34 @@ import (
 | 
			
		||||
 | 
			
		||||
type configKey struct{}
 | 
			
		||||
 | 
			
		||||
func Config(c *redis.Options) store.Option {
 | 
			
		||||
func Config(c *goredis.Options) store.Option {
 | 
			
		||||
	return store.SetOption(configKey{}, c)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type clusterConfigKey struct{}
 | 
			
		||||
 | 
			
		||||
func ClusterConfig(c *redis.ClusterOptions) store.Option {
 | 
			
		||||
func ClusterConfig(c *goredis.ClusterOptions) store.Option {
 | 
			
		||||
	return store.SetOption(clusterConfigKey{}, c)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type universalConfigKey struct{}
 | 
			
		||||
 | 
			
		||||
func UniversalConfig(c *goredis.UniversalOptions) store.Option {
 | 
			
		||||
	return store.SetOption(universalConfigKey{}, c)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	// DefaultMeterStatsInterval holds default stats interval
 | 
			
		||||
	DefaultMeterStatsInterval = 5 * time.Second
 | 
			
		||||
	// DefaultMeterMetricPrefix holds default metric prefix
 | 
			
		||||
	DefaultMeterMetricPrefix = "micro_store_"
 | 
			
		||||
	labelHost = "redis_host"
 | 
			
		||||
	labelName = "redis_name"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Options struct holds wrapper options
 | 
			
		||||
type Options struct {
 | 
			
		||||
	Logger             logger.Logger
 | 
			
		||||
	Meter              meter.Meter
 | 
			
		||||
	Tracer             tracer.Tracer
 | 
			
		||||
	MeterMetricPrefix  string
 | 
			
		||||
	MeterStatsInterval time.Duration
 | 
			
		||||
	Logger    logger.Logger
 | 
			
		||||
	Meter     meter.Meter
 | 
			
		||||
	Tracer    tracer.Tracer
 | 
			
		||||
	RedisHost string
 | 
			
		||||
	RedisName string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Option func signature
 | 
			
		||||
@@ -44,11 +46,9 @@ type Option func(*Options)
 | 
			
		||||
// NewOptions create new Options struct from provided option slice
 | 
			
		||||
func NewOptions(opts ...Option) Options {
 | 
			
		||||
	options := Options{
 | 
			
		||||
		Logger:             logger.DefaultLogger,
 | 
			
		||||
		Meter:              meter.DefaultMeter,
 | 
			
		||||
		Tracer:             tracer.DefaultTracer,
 | 
			
		||||
		MeterStatsInterval: DefaultMeterStatsInterval,
 | 
			
		||||
		MeterMetricPrefix:  DefaultMeterMetricPrefix,
 | 
			
		||||
		Logger: logger.DefaultLogger,
 | 
			
		||||
		Meter:  meter.DefaultMeter,
 | 
			
		||||
		Tracer: tracer.DefaultTracer,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, o := range opts {
 | 
			
		||||
@@ -56,24 +56,12 @@ func NewOptions(opts ...Option) Options {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	options.Meter = options.Meter.Clone(
 | 
			
		||||
		meter.MetricPrefix(options.MeterMetricPrefix),
 | 
			
		||||
		meter.Labels(
 | 
			
		||||
			labelHost, options.RedisHost,
 | 
			
		||||
			labelName, options.RedisName),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
 | 
			
		||||
	options.Logger = options.Logger.Clone(logger.WithAddCallerSkipCount(1))
 | 
			
		||||
 | 
			
		||||
	return options
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MetricInterval specifies stats interval for *sql.DB
 | 
			
		||||
func MetricInterval(td time.Duration) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.MeterStatsInterval = td
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MetricPrefix specifies prefix for each metric
 | 
			
		||||
func MetricPrefix(pref string) Option {
 | 
			
		||||
	return func(o *Options) {
 | 
			
		||||
		o.MeterMetricPrefix = pref
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										380
									
								
								redis.go
									
									
									
									
									
								
							
							
						
						
									
										380
									
								
								redis.go
									
									
									
									
									
								
							@@ -2,12 +2,12 @@ package redis
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	redis "github.com/redis/go-redis/v9"
 | 
			
		||||
	goredis "github.com/redis/go-redis/v9"
 | 
			
		||||
	"go.unistack.org/micro/v3/semconv"
 | 
			
		||||
	"go.unistack.org/micro/v3/store"
 | 
			
		||||
	pool "go.unistack.org/micro/v3/util/xpool"
 | 
			
		||||
@@ -16,7 +16,7 @@ import (
 | 
			
		||||
var (
 | 
			
		||||
	DefaultPathSeparator = "/"
 | 
			
		||||
 | 
			
		||||
	DefaultClusterOptions = &redis.ClusterOptions{
 | 
			
		||||
	DefaultUniversalOptions = &goredis.UniversalOptions{
 | 
			
		||||
		Username:        "",
 | 
			
		||||
		Password:        "", // no password set
 | 
			
		||||
		MaxRetries:      2,
 | 
			
		||||
@@ -28,7 +28,19 @@ var (
 | 
			
		||||
		MinIdleConns:    10,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	DefaultOptions = &redis.Options{
 | 
			
		||||
	DefaultClusterOptions = &goredis.ClusterOptions{
 | 
			
		||||
		Username:        "",
 | 
			
		||||
		Password:        "", // no password set
 | 
			
		||||
		MaxRetries:      2,
 | 
			
		||||
		MaxRetryBackoff: 256 * time.Millisecond,
 | 
			
		||||
		DialTimeout:     1 * time.Second,
 | 
			
		||||
		ReadTimeout:     1 * time.Second,
 | 
			
		||||
		WriteTimeout:    1 * time.Second,
 | 
			
		||||
		PoolTimeout:     1 * time.Second,
 | 
			
		||||
		MinIdleConns:    10,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	DefaultOptions = &goredis.Options{
 | 
			
		||||
		Username:        "",
 | 
			
		||||
		Password:        "", // no password set
 | 
			
		||||
		DB:              0,  // use default DB
 | 
			
		||||
@@ -43,25 +55,22 @@ var (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Store struct {
 | 
			
		||||
	opts store.Options
 | 
			
		||||
	cli  *wrappedClient
 | 
			
		||||
	done chan struct{}
 | 
			
		||||
	pool pool.Pool[*strings.Builder]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type wrappedClient struct {
 | 
			
		||||
	*redis.Client
 | 
			
		||||
	*redis.ClusterClient
 | 
			
		||||
	opts        store.Options
 | 
			
		||||
	cli         goredis.UniversalClient
 | 
			
		||||
	done        chan struct{}
 | 
			
		||||
	pool        *pool.StringsPool
 | 
			
		||||
	isConnected atomic.Int32
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) Connect(ctx context.Context) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		err = r.cli.Client.Ping(ctx).Err()
 | 
			
		||||
	if r.cli == nil {
 | 
			
		||||
		return store.ErrNotConnected
 | 
			
		||||
	}
 | 
			
		||||
	err = r.cli.ClusterClient.Ping(ctx).Err()
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	return err
 | 
			
		||||
	if r.opts.LazyConnect {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return r.connect(ctx)
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) Init(opts ...store.Option) error {
 | 
			
		||||
@@ -77,16 +86,20 @@ func (r *Store) Init(opts ...store.Option) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) Client() *redis.Client {
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		return r.cli.Client
 | 
			
		||||
func (r *Store) Client() *goredis.Client {
 | 
			
		||||
	if c, ok := r.cli.(*goredis.Client); ok {
 | 
			
		||||
		return c
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) ClusterClient() *redis.ClusterClient {
 | 
			
		||||
	if r.cli.ClusterClient != nil {
 | 
			
		||||
		return r.cli.ClusterClient
 | 
			
		||||
func (r *Store) UniversalClient() goredis.UniversalClient {
 | 
			
		||||
	return r.cli
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) ClusterClient() *goredis.ClusterClient {
 | 
			
		||||
	if c, ok := r.cli.(*goredis.ClusterClient); ok {
 | 
			
		||||
		return c
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -97,10 +110,10 @@ func (r *Store) Disconnect(ctx context.Context) error {
 | 
			
		||||
	case <-r.done:
 | 
			
		||||
		return err
 | 
			
		||||
	default:
 | 
			
		||||
		if r.cli.Client != nil {
 | 
			
		||||
			err = r.cli.Client.Close()
 | 
			
		||||
		} else if r.cli.ClusterClient != nil {
 | 
			
		||||
			err = r.cli.ClusterClient.Close()
 | 
			
		||||
		if r.cli != nil {
 | 
			
		||||
			if err = r.cli.Close(); err != nil {
 | 
			
		||||
				r.isConnected.Store(0)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		close(r.done)
 | 
			
		||||
		return err
 | 
			
		||||
@@ -108,6 +121,12 @@ func (r *Store) Disconnect(ctx context.Context) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b := r.pool.Get()
 | 
			
		||||
	defer r.pool.Put(b)
 | 
			
		||||
	options := store.NewExistsOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	timeout := r.opts.Timeout
 | 
			
		||||
@@ -121,23 +140,17 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
 | 
			
		||||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
	rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	var err error
 | 
			
		||||
	var val int64
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		val, err = r.cli.Client.Exists(ctx, rkey).Result()
 | 
			
		||||
	} else {
 | 
			
		||||
		val, err = r.cli.ClusterClient.Exists(ctx, rkey).Result()
 | 
			
		||||
	}
 | 
			
		||||
	val, err := r.cli.Exists(ctx, rkey).Result()
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil || (err == nil && val == 0) {
 | 
			
		||||
	if err == goredis.Nil || (err == nil && val == 0) {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -151,6 +164,13 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b := r.pool.Get()
 | 
			
		||||
	defer r.pool.Put(b)
 | 
			
		||||
 | 
			
		||||
	options := store.NewReadOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	timeout := r.opts.Timeout
 | 
			
		||||
@@ -164,23 +184,17 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
 | 
			
		||||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
	rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	var buf []byte
 | 
			
		||||
	var err error
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		buf, err = r.cli.Client.Get(ctx, rkey).Bytes()
 | 
			
		||||
	} else {
 | 
			
		||||
		buf, err = r.cli.ClusterClient.Get(ctx, rkey).Bytes()
 | 
			
		||||
	}
 | 
			
		||||
	buf, err := r.cli.Get(ctx, rkey).Bytes()
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil || (err == nil && buf == nil) {
 | 
			
		||||
	if err == goredis.Nil || (err == nil && buf == nil) {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -205,6 +219,10 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts ...store.ReadOption) error {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	options := store.NewReadOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	timeout := r.opts.Timeout
 | 
			
		||||
@@ -218,9 +236,15 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
 | 
			
		||||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var rkeys []string
 | 
			
		||||
	var pools []*strings.Builder
 | 
			
		||||
	if r.opts.Namespace != "" || options.Namespace != "" {
 | 
			
		||||
		rkeys = make([]string, len(keys))
 | 
			
		||||
		pools = make([]*strings.Builder, len(keys))
 | 
			
		||||
		for idx, key := range keys {
 | 
			
		||||
			keys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
			b := r.pool.Get()
 | 
			
		||||
			pools[idx] = b
 | 
			
		||||
			rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -228,17 +252,20 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	var rvals []interface{}
 | 
			
		||||
	var err error
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		rvals, err = r.cli.Client.MGet(ctx, keys...).Result()
 | 
			
		||||
	if r.opts.Namespace != "" || options.Namespace != "" {
 | 
			
		||||
		rvals, err = r.cli.MGet(ctx, rkeys...).Result()
 | 
			
		||||
		for idx := range pools {
 | 
			
		||||
			r.pool.Put(pools[idx])
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		rvals, err = r.cli.ClusterClient.MGet(ctx, keys...).Result()
 | 
			
		||||
		rvals, err = r.cli.MGet(ctx, keys...).Result()
 | 
			
		||||
	}
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil || (len(rvals) == 0) {
 | 
			
		||||
	if err == goredis.Nil || (len(rvals) == 0) {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -295,6 +322,10 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.DeleteOption) error {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	options := store.NewDeleteOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	timeout := r.opts.Timeout
 | 
			
		||||
@@ -308,26 +339,35 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
 | 
			
		||||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var rkeys []string
 | 
			
		||||
	var pools []*strings.Builder
 | 
			
		||||
	if r.opts.Namespace != "" || options.Namespace != "" {
 | 
			
		||||
		rkeys = make([]string, len(keys))
 | 
			
		||||
		pools = make([]*strings.Builder, len(keys))
 | 
			
		||||
		for idx, key := range keys {
 | 
			
		||||
			keys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
			b := r.pool.Get()
 | 
			
		||||
			pools[idx] = b
 | 
			
		||||
			rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	var err error
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		err = r.cli.Client.Del(ctx, keys...).Err()
 | 
			
		||||
	if r.opts.Namespace != "" || options.Namespace != "" {
 | 
			
		||||
		err = r.cli.Del(ctx, rkeys...).Err()
 | 
			
		||||
		for idx := range pools {
 | 
			
		||||
			r.pool.Put(pools[idx])
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		err = r.cli.ClusterClient.Del(ctx, keys...).Err()
 | 
			
		||||
		err = r.cli.Del(ctx, keys...).Err()
 | 
			
		||||
	}
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil {
 | 
			
		||||
	if err == goredis.Nil {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -341,6 +381,13 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b := r.pool.Get()
 | 
			
		||||
	defer r.pool.Put(b)
 | 
			
		||||
 | 
			
		||||
	options := store.NewDeleteOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	timeout := r.opts.Timeout
 | 
			
		||||
@@ -356,18 +403,13 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	var err error
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		err = r.cli.Client.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
 | 
			
		||||
	} else {
 | 
			
		||||
		err = r.cli.ClusterClient.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
 | 
			
		||||
	}
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	err := r.cli.Del(ctx, r.getKey(b, r.opts.Namespace, options.Namespace, key)).Err()
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil {
 | 
			
		||||
	if err == goredis.Nil {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -381,6 +423,10 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, opts ...store.WriteOption) error {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	options := store.NewWriteOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	timeout := r.opts.Timeout
 | 
			
		||||
@@ -395,9 +441,11 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	kvs := make([]string, 0, len(keys)*2)
 | 
			
		||||
 | 
			
		||||
	pools := make([]*strings.Builder, len(keys))
 | 
			
		||||
	for idx, key := range keys {
 | 
			
		||||
		kvs = append(kvs, r.getKey(r.opts.Namespace, options.Namespace, key))
 | 
			
		||||
		b := r.pool.Get()
 | 
			
		||||
		pools[idx] = b
 | 
			
		||||
		kvs = append(kvs, r.getKey(b, r.opts.Namespace, options.Namespace, key))
 | 
			
		||||
 | 
			
		||||
		switch vt := vals[idx].(type) {
 | 
			
		||||
		case string:
 | 
			
		||||
@@ -414,9 +462,8 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
 | 
			
		||||
	pipeliner := func(pipe redis.Pipeliner) error {
 | 
			
		||||
	pipeliner := func(pipe goredis.Pipeliner) error {
 | 
			
		||||
		for idx := 0; idx < len(kvs); idx += 2 {
 | 
			
		||||
			if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil {
 | 
			
		||||
				setSpanError(ctx, err)
 | 
			
		||||
@@ -426,20 +473,18 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	var cmds []redis.Cmder
 | 
			
		||||
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		cmds, err = r.cli.Client.Pipelined(ctx, pipeliner)
 | 
			
		||||
	} else {
 | 
			
		||||
		cmds, err = r.cli.ClusterClient.Pipelined(ctx, pipeliner)
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	cmds, err := r.cli.Pipelined(ctx, pipeliner)
 | 
			
		||||
	for idx := range pools {
 | 
			
		||||
		r.pool.Put(pools[idx])
 | 
			
		||||
	}
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil {
 | 
			
		||||
	if err == goredis.Nil {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -451,7 +496,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
			
		||||
 | 
			
		||||
	for _, cmd := range cmds {
 | 
			
		||||
		if err = cmd.Err(); err != nil {
 | 
			
		||||
			if err == redis.Nil {
 | 
			
		||||
			if err == goredis.Nil {
 | 
			
		||||
				r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
				return store.ErrNotFound
 | 
			
		||||
			}
 | 
			
		||||
@@ -465,6 +510,13 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b := r.pool.Get()
 | 
			
		||||
	defer r.pool.Put(b)
 | 
			
		||||
 | 
			
		||||
	options := store.NewWriteOptions(opts...)
 | 
			
		||||
 | 
			
		||||
	timeout := r.opts.Timeout
 | 
			
		||||
@@ -478,7 +530,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
 | 
			
		||||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
	rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
			
		||||
 | 
			
		||||
	var buf []byte
 | 
			
		||||
	switch vt := val.(type) {
 | 
			
		||||
@@ -496,18 +548,14 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	var err error
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		err = r.cli.Client.Set(ctx, rkey, buf, options.TTL).Err()
 | 
			
		||||
	} else {
 | 
			
		||||
		err = r.cli.ClusterClient.Set(ctx, rkey, buf, options.TTL).Err()
 | 
			
		||||
	}
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	err := r.cli.Set(ctx, rkey, buf, options.TTL).Err()
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil {
 | 
			
		||||
	if err == goredis.Nil {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -521,12 +569,19 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
 | 
			
		||||
	if err := r.connect(ctx); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b := r.pool.Get()
 | 
			
		||||
	defer r.pool.Put(b)
 | 
			
		||||
 | 
			
		||||
	options := store.NewListOptions(opts...)
 | 
			
		||||
	if len(options.Namespace) == 0 {
 | 
			
		||||
		options.Namespace = r.opts.Namespace
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rkey := r.getKey(options.Namespace, "", options.Prefix+"*")
 | 
			
		||||
	rkey := r.getKey(b, options.Namespace, "", options.Prefix+"*")
 | 
			
		||||
	if options.Suffix != "" {
 | 
			
		||||
		rkey += options.Suffix
 | 
			
		||||
	}
 | 
			
		||||
@@ -548,10 +603,8 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
 | 
			
		||||
	var keys []string
 | 
			
		||||
	var err error
 | 
			
		||||
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		keys, err = r.cli.Client.Keys(ctx, rkey).Result()
 | 
			
		||||
	} else {
 | 
			
		||||
		err = r.cli.ClusterClient.ForEachMaster(ctx, func(nctx context.Context, cli *redis.Client) error {
 | 
			
		||||
	if c, ok := r.cli.(*goredis.ClusterClient); ok {
 | 
			
		||||
		err = c.ForEachMaster(ctx, func(nctx context.Context, cli *goredis.Client) error {
 | 
			
		||||
			nkeys, nerr := cli.Keys(nctx, rkey).Result()
 | 
			
		||||
			if nerr != nil {
 | 
			
		||||
				return nerr
 | 
			
		||||
@@ -559,13 +612,16 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
 | 
			
		||||
			keys = append(keys, nkeys...)
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
	} else {
 | 
			
		||||
		keys, err = r.cli.Keys(ctx, rkey).Result()
 | 
			
		||||
	}
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	setSpanError(ctx, err)
 | 
			
		||||
 | 
			
		||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
			
		||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
			
		||||
	if err == redis.Nil {
 | 
			
		||||
	if err == goredis.Nil {
 | 
			
		||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
			
		||||
		return nil, store.ErrNotFound
 | 
			
		||||
	} else if err == nil {
 | 
			
		||||
@@ -607,79 +663,102 @@ func NewStore(opts ...store.Option) *Store {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) configure() error {
 | 
			
		||||
	var redisOptions *redis.Options
 | 
			
		||||
	var redisClusterOptions *redis.ClusterOptions
 | 
			
		||||
	var err error
 | 
			
		||||
 | 
			
		||||
	nodes := r.opts.Addrs
 | 
			
		||||
 | 
			
		||||
	if len(nodes) == 0 {
 | 
			
		||||
		nodes = []string{"redis://127.0.0.1:6379"}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if r.cli != nil && r.opts.Context == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	universalOptions := DefaultUniversalOptions
 | 
			
		||||
 | 
			
		||||
	if r.opts.Context != nil {
 | 
			
		||||
		if c, ok := r.opts.Context.Value(configKey{}).(*redis.Options); ok {
 | 
			
		||||
			redisOptions = c
 | 
			
		||||
		if o, ok := r.opts.Context.Value(configKey{}).(*goredis.Options); ok {
 | 
			
		||||
			universalOptions.Addrs = []string{o.Addr}
 | 
			
		||||
			universalOptions.Dialer = o.Dialer
 | 
			
		||||
			universalOptions.OnConnect = o.OnConnect
 | 
			
		||||
			universalOptions.Username = o.Username
 | 
			
		||||
			universalOptions.Password = o.Password
 | 
			
		||||
 | 
			
		||||
			universalOptions.MaxRetries = o.MaxRetries
 | 
			
		||||
			universalOptions.MinRetryBackoff = o.MinRetryBackoff
 | 
			
		||||
			universalOptions.MaxRetryBackoff = o.MaxRetryBackoff
 | 
			
		||||
 | 
			
		||||
			universalOptions.DialTimeout = o.DialTimeout
 | 
			
		||||
			universalOptions.ReadTimeout = o.ReadTimeout
 | 
			
		||||
			universalOptions.WriteTimeout = o.WriteTimeout
 | 
			
		||||
			universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled
 | 
			
		||||
 | 
			
		||||
			universalOptions.PoolFIFO = o.PoolFIFO
 | 
			
		||||
 | 
			
		||||
			universalOptions.PoolSize = o.PoolSize
 | 
			
		||||
			universalOptions.PoolTimeout = o.PoolTimeout
 | 
			
		||||
			universalOptions.MinIdleConns = o.MinIdleConns
 | 
			
		||||
			universalOptions.MaxIdleConns = o.MaxIdleConns
 | 
			
		||||
			universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime
 | 
			
		||||
			universalOptions.ConnMaxLifetime = o.ConnMaxLifetime
 | 
			
		||||
 | 
			
		||||
			if r.opts.TLSConfig != nil {
 | 
			
		||||
				redisOptions.TLSConfig = r.opts.TLSConfig
 | 
			
		||||
				universalOptions.TLSConfig = r.opts.TLSConfig
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if c, ok := r.opts.Context.Value(clusterConfigKey{}).(*redis.ClusterOptions); ok {
 | 
			
		||||
			redisClusterOptions = c
 | 
			
		||||
		if o, ok := r.opts.Context.Value(clusterConfigKey{}).(*goredis.ClusterOptions); ok {
 | 
			
		||||
			universalOptions.Addrs = o.Addrs
 | 
			
		||||
			universalOptions.Dialer = o.Dialer
 | 
			
		||||
			universalOptions.OnConnect = o.OnConnect
 | 
			
		||||
			universalOptions.Username = o.Username
 | 
			
		||||
			universalOptions.Password = o.Password
 | 
			
		||||
 | 
			
		||||
			universalOptions.MaxRedirects = o.MaxRedirects
 | 
			
		||||
			universalOptions.ReadOnly = o.ReadOnly
 | 
			
		||||
			universalOptions.RouteByLatency = o.RouteByLatency
 | 
			
		||||
			universalOptions.RouteRandomly = o.RouteRandomly
 | 
			
		||||
 | 
			
		||||
			universalOptions.MaxRetries = o.MaxRetries
 | 
			
		||||
			universalOptions.MinRetryBackoff = o.MinRetryBackoff
 | 
			
		||||
			universalOptions.MaxRetryBackoff = o.MaxRetryBackoff
 | 
			
		||||
 | 
			
		||||
			universalOptions.DialTimeout = o.DialTimeout
 | 
			
		||||
			universalOptions.ReadTimeout = o.ReadTimeout
 | 
			
		||||
			universalOptions.WriteTimeout = o.WriteTimeout
 | 
			
		||||
			universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled
 | 
			
		||||
 | 
			
		||||
			universalOptions.PoolFIFO = o.PoolFIFO
 | 
			
		||||
 | 
			
		||||
			universalOptions.PoolSize = o.PoolSize
 | 
			
		||||
			universalOptions.PoolTimeout = o.PoolTimeout
 | 
			
		||||
			universalOptions.MinIdleConns = o.MinIdleConns
 | 
			
		||||
			universalOptions.MaxIdleConns = o.MaxIdleConns
 | 
			
		||||
			universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime
 | 
			
		||||
			universalOptions.ConnMaxLifetime = o.ConnMaxLifetime
 | 
			
		||||
			if r.opts.TLSConfig != nil {
 | 
			
		||||
				redisClusterOptions.TLSConfig = r.opts.TLSConfig
 | 
			
		||||
				universalOptions.TLSConfig = r.opts.TLSConfig
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if o, ok := r.opts.Context.Value(universalConfigKey{}).(*goredis.UniversalOptions); ok {
 | 
			
		||||
			universalOptions = o
 | 
			
		||||
			if r.opts.TLSConfig != nil {
 | 
			
		||||
				universalOptions.TLSConfig = r.opts.TLSConfig
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if redisOptions != nil && redisClusterOptions != nil {
 | 
			
		||||
		return fmt.Errorf("must specify only one option Config or ClusterConfig")
 | 
			
		||||
	if len(r.opts.Addrs) > 0 {
 | 
			
		||||
		universalOptions.Addrs = r.opts.Addrs
 | 
			
		||||
	} else {
 | 
			
		||||
		universalOptions.Addrs = []string{"127.0.0.1:6379"}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if redisOptions == nil && redisClusterOptions == nil && r.cli != nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	r.cli = goredis.NewUniversalClient(universalOptions)
 | 
			
		||||
	setTracing(r.cli, r.opts.Tracer)
 | 
			
		||||
 | 
			
		||||
	if redisOptions == nil && redisClusterOptions == nil && len(nodes) == 1 {
 | 
			
		||||
		redisOptions, err = redis.ParseURL(nodes[0])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			redisOptions = DefaultOptions
 | 
			
		||||
			redisOptions.Addr = r.opts.Addrs[0]
 | 
			
		||||
			redisOptions.TLSConfig = r.opts.TLSConfig
 | 
			
		||||
		}
 | 
			
		||||
	} else if redisOptions == nil && redisClusterOptions == nil && len(nodes) > 1 {
 | 
			
		||||
		redisClusterOptions = DefaultClusterOptions
 | 
			
		||||
		redisClusterOptions.Addrs = r.opts.Addrs
 | 
			
		||||
		redisClusterOptions.TLSConfig = r.opts.TLSConfig
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if redisOptions != nil {
 | 
			
		||||
		c := redis.NewClient(redisOptions)
 | 
			
		||||
		setTracing(c, r.opts.Tracer)
 | 
			
		||||
		r.cli = &wrappedClient{Client: c}
 | 
			
		||||
	} else if redisClusterOptions != nil {
 | 
			
		||||
		c := redis.NewClusterClient(redisClusterOptions)
 | 
			
		||||
		setTracing(c, r.opts.Tracer)
 | 
			
		||||
		r.cli = &wrappedClient{ClusterClient: c}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	r.pool = pool.NewPool(func() *strings.Builder { return &strings.Builder{} })
 | 
			
		||||
	r.pool = pool.NewStringsPool(50)
 | 
			
		||||
 | 
			
		||||
	r.statsMeter()
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) getKey(mainNamespace string, opNamespace string, key string) string {
 | 
			
		||||
	b := r.pool.Get()
 | 
			
		||||
	defer r.pool.Put(b)
 | 
			
		||||
	b.Reset()
 | 
			
		||||
 | 
			
		||||
func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace string, key string) string {
 | 
			
		||||
	if opNamespace == "" {
 | 
			
		||||
		opNamespace = mainNamespace
 | 
			
		||||
	}
 | 
			
		||||
@@ -690,3 +769,14 @@ func (r *Store) getKey(mainNamespace string, opNamespace string, key string) str
 | 
			
		||||
	b.WriteString(key)
 | 
			
		||||
	return b.String()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) connect(ctx context.Context) (err error) {
 | 
			
		||||
	if r.isConnected.Load() == 0 {
 | 
			
		||||
		if err = r.cli.Ping(ctx).Err(); err != nil {
 | 
			
		||||
			setSpanError(ctx, err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	r.isConnected.Store(1)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -7,6 +7,7 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	goredis "github.com/redis/go-redis/v9"
 | 
			
		||||
	"go.unistack.org/micro/v3/store"
 | 
			
		||||
	"go.unistack.org/micro/v3/tracer"
 | 
			
		||||
)
 | 
			
		||||
@@ -41,7 +42,7 @@ func TestKeepTTL(t *testing.T) {
 | 
			
		||||
func Test_rkv_configure(t *testing.T) {
 | 
			
		||||
	type fields struct {
 | 
			
		||||
		options store.Options
 | 
			
		||||
		Client  *wrappedClient
 | 
			
		||||
		Client  goredis.UniversalClient
 | 
			
		||||
	}
 | 
			
		||||
	type wantValues struct {
 | 
			
		||||
		username string
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										40
									
								
								stats.go
									
									
									
									
									
								
							
							
						
						
									
										40
									
								
								stats.go
									
									
									
									
									
								
							@@ -3,7 +3,8 @@ package redis
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	goredis "github.com/redis/go-redis/v9"
 | 
			
		||||
	"go.unistack.org/micro/v3/meter"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
@@ -13,45 +14,36 @@ var (
 | 
			
		||||
	PoolConnTotalCurrent = "pool_conn_total_current"
 | 
			
		||||
	PoolConnIdleCurrent  = "pool_conn_idle_current"
 | 
			
		||||
	PoolConnStaleTotal   = "pool_conn_stale_total"
 | 
			
		||||
 | 
			
		||||
	meterRequestTotal               = "request_total"
 | 
			
		||||
	meterRequestLatencyMicroseconds = "latency_microseconds"
 | 
			
		||||
	meterRequestDurationSeconds     = "request_duration_seconds"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Statser interface {
 | 
			
		||||
	PoolStats() *redis.PoolStats
 | 
			
		||||
	PoolStats() *goredis.PoolStats
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Store) statsMeter() {
 | 
			
		||||
	var st Statser
 | 
			
		||||
 | 
			
		||||
	if r.cli.Client != nil {
 | 
			
		||||
		st = r.cli.Client
 | 
			
		||||
	} else if r.cli.ClusterClient != nil {
 | 
			
		||||
		st = r.cli.ClusterClient
 | 
			
		||||
	if r.cli != nil {
 | 
			
		||||
		st = r.cli
 | 
			
		||||
	} else {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		ticker := time.NewTicker(DefaultMeterStatsInterval)
 | 
			
		||||
		ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
 | 
			
		||||
		defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case <-ticker.C:
 | 
			
		||||
				if st == nil {
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				stats := st.PoolStats()
 | 
			
		||||
				r.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits))
 | 
			
		||||
				r.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses))
 | 
			
		||||
				r.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts))
 | 
			
		||||
				r.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns))
 | 
			
		||||
				r.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns))
 | 
			
		||||
				r.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns))
 | 
			
		||||
		for _ = range ticker.C {
 | 
			
		||||
			if st == nil {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			stats := st.PoolStats()
 | 
			
		||||
			r.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits))
 | 
			
		||||
			r.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses))
 | 
			
		||||
			r.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts))
 | 
			
		||||
			r.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns))
 | 
			
		||||
			r.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns))
 | 
			
		||||
			r.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns))
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										67
									
								
								tracer.go
									
									
									
									
									
								
							
							
						
						
									
										67
									
								
								tracer.go
									
									
									
									
									
								
							@@ -6,27 +6,25 @@ import (
 | 
			
		||||
	"net"
 | 
			
		||||
	"strconv"
 | 
			
		||||
 | 
			
		||||
	"github.com/redis/go-redis/extra/rediscmd/v9"
 | 
			
		||||
	"github.com/redis/go-redis/v9"
 | 
			
		||||
	rediscmd "github.com/redis/go-redis/extra/rediscmd/v9"
 | 
			
		||||
	goredis "github.com/redis/go-redis/v9"
 | 
			
		||||
	"go.unistack.org/micro/v3/tracer"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func setTracing(rdb redis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) {
 | 
			
		||||
func setTracing(rdb goredis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) {
 | 
			
		||||
	switch rdb := rdb.(type) {
 | 
			
		||||
	case *redis.Client:
 | 
			
		||||
	case *goredis.Client:
 | 
			
		||||
		opt := rdb.Options()
 | 
			
		||||
		connString := formatDBConnString(opt.Network, opt.Addr)
 | 
			
		||||
		rdb.AddHook(newTracingHook(connString, tr))
 | 
			
		||||
	case *redis.ClusterClient:
 | 
			
		||||
		rdb.AddHook(newTracingHook("", tr, opts...))
 | 
			
		||||
		rdb.OnNewNode(func(rdb *redis.Client) {
 | 
			
		||||
	case *goredis.ClusterClient:
 | 
			
		||||
		rdb.OnNewNode(func(rdb *goredis.Client) {
 | 
			
		||||
			opt := rdb.Options()
 | 
			
		||||
			connString := formatDBConnString(opt.Network, opt.Addr)
 | 
			
		||||
			rdb.AddHook(newTracingHook(connString, tr))
 | 
			
		||||
		})
 | 
			
		||||
	case *redis.Ring:
 | 
			
		||||
		rdb.AddHook(newTracingHook("", tr, opts...))
 | 
			
		||||
		rdb.OnNewNode(func(rdb *redis.Client) {
 | 
			
		||||
	case *goredis.Ring:
 | 
			
		||||
		rdb.OnNewNode(func(rdb *goredis.Client) {
 | 
			
		||||
			opt := rdb.Options()
 | 
			
		||||
			connString := formatDBConnString(opt.Network, opt.Addr)
 | 
			
		||||
			rdb.AddHook(newTracingHook(connString, tr))
 | 
			
		||||
@@ -39,7 +37,7 @@ type tracingHook struct {
 | 
			
		||||
	opts []tracer.SpanOption
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ redis.Hook = (*tracingHook)(nil)
 | 
			
		||||
var _ goredis.Hook = (*tracingHook)(nil)
 | 
			
		||||
 | 
			
		||||
func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook {
 | 
			
		||||
	opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient))
 | 
			
		||||
@@ -53,45 +51,54 @@ func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOpti
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
 | 
			
		||||
func (h *tracingHook) DialHook(hook goredis.DialHook) goredis.DialHook {
 | 
			
		||||
	return func(ctx context.Context, network, addr string) (net.Conn, error) {
 | 
			
		||||
		sctx, span := h.tr.Start(ctx, "redis.dial", h.opts...)
 | 
			
		||||
		defer span.Finish()
 | 
			
		||||
 | 
			
		||||
		conn, err := hook(sctx, network, addr)
 | 
			
		||||
		recordError(span, err)
 | 
			
		||||
		/*
 | 
			
		||||
			_, span := h.tr.Start(ctx, "goredis.dial", h.opts...)
 | 
			
		||||
			defer span.Finish()
 | 
			
		||||
		*/
 | 
			
		||||
		conn, err := hook(ctx, network, addr)
 | 
			
		||||
		// recordError(span, err)
 | 
			
		||||
 | 
			
		||||
		return conn, err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
 | 
			
		||||
	return func(ctx context.Context, cmd redis.Cmder) error {
 | 
			
		||||
func (h *tracingHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook {
 | 
			
		||||
	return func(ctx context.Context, cmd goredis.Cmder) error {
 | 
			
		||||
		cmdString := rediscmd.CmdString(cmd)
 | 
			
		||||
		var err error
 | 
			
		||||
 | 
			
		||||
		sctx, span := h.tr.Start(ctx, "redis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...)
 | 
			
		||||
		defer span.Finish()
 | 
			
		||||
		switch cmdString {
 | 
			
		||||
		case "cluster slots":
 | 
			
		||||
			break
 | 
			
		||||
		default:
 | 
			
		||||
			_, span := h.tr.Start(ctx, "goredis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...)
 | 
			
		||||
			defer func() {
 | 
			
		||||
				recordError(span, err)
 | 
			
		||||
				span.Finish()
 | 
			
		||||
			}()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		err := hook(sctx, cmd)
 | 
			
		||||
		recordError(span, err)
 | 
			
		||||
		err = hook(ctx, cmd)
 | 
			
		||||
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
 | 
			
		||||
	return func(ctx context.Context, cmds []redis.Cmder) error {
 | 
			
		||||
func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook {
 | 
			
		||||
	return func(ctx context.Context, cmds []goredis.Cmder) error {
 | 
			
		||||
		_, cmdsString := rediscmd.CmdsString(cmds)
 | 
			
		||||
 | 
			
		||||
		opts := append(h.opts, tracer.WithSpanLabels(
 | 
			
		||||
			"db.redis.num_cmd", strconv.Itoa(len(cmds)),
 | 
			
		||||
			"db.goredis.num_cmd", strconv.Itoa(len(cmds)),
 | 
			
		||||
			"db.statement", cmdsString,
 | 
			
		||||
		))
 | 
			
		||||
 | 
			
		||||
		sctx, span := h.tr.Start(ctx, "redis.process_pipeline", opts...)
 | 
			
		||||
		_, span := h.tr.Start(ctx, "goredis.process_pipeline", opts...)
 | 
			
		||||
		defer span.Finish()
 | 
			
		||||
 | 
			
		||||
		err := hook(sctx, cmds)
 | 
			
		||||
		err := hook(ctx, cmds)
 | 
			
		||||
		recordError(span, err)
 | 
			
		||||
 | 
			
		||||
		return err
 | 
			
		||||
@@ -99,7 +106,7 @@ func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func setSpanError(ctx context.Context, err error) {
 | 
			
		||||
	if err == nil || err == redis.Nil {
 | 
			
		||||
	if err == nil || err == goredis.Nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil {
 | 
			
		||||
@@ -108,7 +115,7 @@ func setSpanError(ctx context.Context, err error) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func recordError(span tracer.Span, err error) {
 | 
			
		||||
	if err != nil && err != redis.Nil {
 | 
			
		||||
	if err != nil && err != goredis.Nil {
 | 
			
		||||
		span.SetStatus(tracer.SpanStatusError, err.Error())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user