Compare commits
	
		
			17 Commits
		
	
	
		
			v3.10.11
			...
			9c3d0a6a6e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 9c3d0a6a6e | |||
| 15d7ad156f | |||
| 0981f89f60 | |||
| 332fe5f4d4 | |||
| 757fe0245b | |||
| 27eccc1ed2 | |||
| 7c641fa8ac | |||
| 24c9f20196 | |||
| 953b5b0021 | |||
| 87e2e2b947 | |||
| 256e61a437 | |||
| f9cdd41c94 | |||
| ecad15fe17 | |||
| fa3d18b353 | |||
| 2f3951773f | |||
| b263e14032 | |||
| 518cc1db73 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@@ -0,0 +1 @@
 | 
				
			|||||||
 | 
					.idea
 | 
				
			||||||
							
								
								
									
										13
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								go.mod
									
									
									
									
									
								
							@@ -1,16 +1,19 @@
 | 
				
			|||||||
module go.unistack.org/micro-store-redis/v3
 | 
					module go.unistack.org/micro-store-redis/v3
 | 
				
			||||||
 | 
					
 | 
				
			||||||
go 1.21
 | 
					go 1.22
 | 
				
			||||||
 | 
					
 | 
				
			||||||
toolchain go1.22.4
 | 
					toolchain go1.22.4
 | 
				
			||||||
 | 
					
 | 
				
			||||||
require (
 | 
					require (
 | 
				
			||||||
	github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3
 | 
						github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0
 | 
				
			||||||
	github.com/redis/go-redis/v9 v9.5.3
 | 
						github.com/redis/go-redis/v9 v9.7.0
 | 
				
			||||||
	go.unistack.org/micro/v3 v3.10.80
 | 
						go.unistack.org/micro/v3 v3.10.105
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
require (
 | 
					require (
 | 
				
			||||||
	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 | 
						github.com/cespare/xxhash/v2 v2.3.0 // indirect
 | 
				
			||||||
	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 | 
						github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
 | 
				
			||||||
 | 
						github.com/google/go-cmp v0.6.0 // indirect
 | 
				
			||||||
 | 
						go.unistack.org/micro-proto/v3 v3.4.1 // indirect
 | 
				
			||||||
 | 
						google.golang.org/protobuf v1.35.2 // indirect
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										22
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								go.sum
									
									
									
									
									
								
							@@ -2,13 +2,19 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
 | 
				
			|||||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
 | 
					github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
 | 
				
			||||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
 | 
					github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
 | 
				
			||||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
 | 
					github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
 | 
				
			||||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
 | 
					github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
 | 
				
			||||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 | 
					github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 | 
				
			||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
 | 
					github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
 | 
				
			||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 | 
					github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 | 
				
			||||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20=
 | 
					github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
 | 
				
			||||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ=
 | 
					github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 | 
				
			||||||
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
 | 
					github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk=
 | 
				
			||||||
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
 | 
					github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0/go.mod h1:eTg/YQtGYAZD5r3DlGlJptJ45AHA+/G+2NPn30PKzik=
 | 
				
			||||||
go.unistack.org/micro/v3 v3.10.80 h1:A0zWNoM9MOcMg9gdFFgVkgbT3uSYVIINhuvumX9nP2o=
 | 
					github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
 | 
				
			||||||
go.unistack.org/micro/v3 v3.10.80/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
 | 
					github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
 | 
				
			||||||
 | 
					go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
 | 
				
			||||||
 | 
					go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
 | 
				
			||||||
 | 
					go.unistack.org/micro/v3 v3.10.105 h1:JYNV0d+fnR7Hy8d4/sjr+25DbSNqq1Z7IPeDDdB+f1I=
 | 
				
			||||||
 | 
					go.unistack.org/micro/v3 v3.10.105/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g=
 | 
				
			||||||
 | 
					google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
 | 
				
			||||||
 | 
					google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										46
									
								
								options.go
									
									
									
									
									
								
							
							
						
						
									
										46
									
								
								options.go
									
									
									
									
									
								
							@@ -1,9 +1,7 @@
 | 
				
			|||||||
package redis
 | 
					package redis
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"time"
 | 
						goredis "github.com/redis/go-redis/v9"
 | 
				
			||||||
 | 
					 | 
				
			||||||
	"github.com/redis/go-redis/v9"
 | 
					 | 
				
			||||||
	"go.unistack.org/micro/v3/logger"
 | 
						"go.unistack.org/micro/v3/logger"
 | 
				
			||||||
	"go.unistack.org/micro/v3/meter"
 | 
						"go.unistack.org/micro/v3/meter"
 | 
				
			||||||
	"go.unistack.org/micro/v3/store"
 | 
						"go.unistack.org/micro/v3/store"
 | 
				
			||||||
@@ -12,21 +10,25 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type configKey struct{}
 | 
					type configKey struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Config(c *redis.Options) store.Option {
 | 
					func Config(c *goredis.Options) store.Option {
 | 
				
			||||||
	return store.SetOption(configKey{}, c)
 | 
						return store.SetOption(configKey{}, c)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type clusterConfigKey struct{}
 | 
					type clusterConfigKey struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func ClusterConfig(c *redis.ClusterOptions) store.Option {
 | 
					func ClusterConfig(c *goredis.ClusterOptions) store.Option {
 | 
				
			||||||
	return store.SetOption(clusterConfigKey{}, c)
 | 
						return store.SetOption(clusterConfigKey{}, c)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type universalConfigKey struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func UniversalConfig(c *goredis.UniversalOptions) store.Option {
 | 
				
			||||||
 | 
						return store.SetOption(universalConfigKey{}, c)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
	// DefaultMeterStatsInterval holds default stats interval
 | 
						labelHost = "redis_host"
 | 
				
			||||||
	DefaultMeterStatsInterval = 5 * time.Second
 | 
						labelName = "redis_name"
 | 
				
			||||||
	// DefaultMeterMetricPrefix holds default metric prefix
 | 
					 | 
				
			||||||
	DefaultMeterMetricPrefix = "micro_store_"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Options struct holds wrapper options
 | 
					// Options struct holds wrapper options
 | 
				
			||||||
@@ -34,8 +36,8 @@ type Options struct {
 | 
				
			|||||||
	Logger    logger.Logger
 | 
						Logger    logger.Logger
 | 
				
			||||||
	Meter     meter.Meter
 | 
						Meter     meter.Meter
 | 
				
			||||||
	Tracer    tracer.Tracer
 | 
						Tracer    tracer.Tracer
 | 
				
			||||||
	MeterMetricPrefix  string
 | 
						RedisHost string
 | 
				
			||||||
	MeterStatsInterval time.Duration
 | 
						RedisName string
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Option func signature
 | 
					// Option func signature
 | 
				
			||||||
@@ -47,8 +49,6 @@ func NewOptions(opts ...Option) Options {
 | 
				
			|||||||
		Logger: logger.DefaultLogger,
 | 
							Logger: logger.DefaultLogger,
 | 
				
			||||||
		Meter:  meter.DefaultMeter,
 | 
							Meter:  meter.DefaultMeter,
 | 
				
			||||||
		Tracer: tracer.DefaultTracer,
 | 
							Tracer: tracer.DefaultTracer,
 | 
				
			||||||
		MeterStatsInterval: DefaultMeterStatsInterval,
 | 
					 | 
				
			||||||
		MeterMetricPrefix:  DefaultMeterMetricPrefix,
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, o := range opts {
 | 
						for _, o := range opts {
 | 
				
			||||||
@@ -56,24 +56,12 @@ func NewOptions(opts ...Option) Options {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options.Meter = options.Meter.Clone(
 | 
						options.Meter = options.Meter.Clone(
 | 
				
			||||||
		meter.MetricPrefix(options.MeterMetricPrefix),
 | 
							meter.Labels(
 | 
				
			||||||
 | 
								labelHost, options.RedisHost,
 | 
				
			||||||
 | 
								labelName, options.RedisName),
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
 | 
						options.Logger = options.Logger.Clone(logger.WithAddCallerSkipCount(1))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return options
 | 
						return options
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// MetricInterval specifies stats interval for *sql.DB
 | 
					 | 
				
			||||||
func MetricInterval(td time.Duration) Option {
 | 
					 | 
				
			||||||
	return func(o *Options) {
 | 
					 | 
				
			||||||
		o.MeterStatsInterval = td
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// MetricPrefix specifies prefix for each metric
 | 
					 | 
				
			||||||
func MetricPrefix(pref string) Option {
 | 
					 | 
				
			||||||
	return func(o *Options) {
 | 
					 | 
				
			||||||
		o.MeterMetricPrefix = pref
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										376
									
								
								redis.go
									
									
									
									
									
								
							
							
						
						
									
										376
									
								
								redis.go
									
									
									
									
									
								
							@@ -2,12 +2,12 @@ package redis
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"reflect"
 | 
						"reflect"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	redis "github.com/redis/go-redis/v9"
 | 
						goredis "github.com/redis/go-redis/v9"
 | 
				
			||||||
	"go.unistack.org/micro/v3/semconv"
 | 
						"go.unistack.org/micro/v3/semconv"
 | 
				
			||||||
	"go.unistack.org/micro/v3/store"
 | 
						"go.unistack.org/micro/v3/store"
 | 
				
			||||||
	pool "go.unistack.org/micro/v3/util/xpool"
 | 
						pool "go.unistack.org/micro/v3/util/xpool"
 | 
				
			||||||
@@ -16,7 +16,7 @@ import (
 | 
				
			|||||||
var (
 | 
					var (
 | 
				
			||||||
	DefaultPathSeparator = "/"
 | 
						DefaultPathSeparator = "/"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	DefaultClusterOptions = &redis.ClusterOptions{
 | 
						DefaultUniversalOptions = &goredis.UniversalOptions{
 | 
				
			||||||
		Username:        "",
 | 
							Username:        "",
 | 
				
			||||||
		Password:        "", // no password set
 | 
							Password:        "", // no password set
 | 
				
			||||||
		MaxRetries:      2,
 | 
							MaxRetries:      2,
 | 
				
			||||||
@@ -28,7 +28,19 @@ var (
 | 
				
			|||||||
		MinIdleConns:    10,
 | 
							MinIdleConns:    10,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	DefaultOptions = &redis.Options{
 | 
						DefaultClusterOptions = &goredis.ClusterOptions{
 | 
				
			||||||
 | 
							Username:        "",
 | 
				
			||||||
 | 
							Password:        "", // no password set
 | 
				
			||||||
 | 
							MaxRetries:      2,
 | 
				
			||||||
 | 
							MaxRetryBackoff: 256 * time.Millisecond,
 | 
				
			||||||
 | 
							DialTimeout:     1 * time.Second,
 | 
				
			||||||
 | 
							ReadTimeout:     1 * time.Second,
 | 
				
			||||||
 | 
							WriteTimeout:    1 * time.Second,
 | 
				
			||||||
 | 
							PoolTimeout:     1 * time.Second,
 | 
				
			||||||
 | 
							MinIdleConns:    10,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						DefaultOptions = &goredis.Options{
 | 
				
			||||||
		Username:        "",
 | 
							Username:        "",
 | 
				
			||||||
		Password:        "", // no password set
 | 
							Password:        "", // no password set
 | 
				
			||||||
		DB:              0,  // use default DB
 | 
							DB:              0,  // use default DB
 | 
				
			||||||
@@ -44,24 +56,21 @@ var (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type Store struct {
 | 
					type Store struct {
 | 
				
			||||||
	opts        store.Options
 | 
						opts        store.Options
 | 
				
			||||||
	cli  *wrappedClient
 | 
						cli         goredis.UniversalClient
 | 
				
			||||||
	done        chan struct{}
 | 
						done        chan struct{}
 | 
				
			||||||
	pool pool.Pool[*strings.Builder]
 | 
						pool        *pool.StringsPool
 | 
				
			||||||
}
 | 
						isConnected atomic.Int32
 | 
				
			||||||
 | 
					 | 
				
			||||||
type wrappedClient struct {
 | 
					 | 
				
			||||||
	*redis.Client
 | 
					 | 
				
			||||||
	*redis.ClusterClient
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) Connect(ctx context.Context) error {
 | 
					func (r *Store) Connect(ctx context.Context) error {
 | 
				
			||||||
	var err error
 | 
						if r.cli == nil {
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
							return store.ErrNotConnected
 | 
				
			||||||
		err = r.cli.Client.Ping(ctx).Err()
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	err = r.cli.ClusterClient.Ping(ctx).Err()
 | 
						if r.opts.LazyConnect {
 | 
				
			||||||
	setSpanError(ctx, err)
 | 
							return nil
 | 
				
			||||||
	return err
 | 
						}
 | 
				
			||||||
 | 
						return r.connect(ctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) Init(opts ...store.Option) error {
 | 
					func (r *Store) Init(opts ...store.Option) error {
 | 
				
			||||||
@@ -77,16 +86,20 @@ func (r *Store) Init(opts ...store.Option) error {
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) Client() *redis.Client {
 | 
					func (r *Store) Client() *goredis.Client {
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
						if c, ok := r.cli.(*goredis.Client); ok {
 | 
				
			||||||
		return r.cli.Client
 | 
							return c
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) ClusterClient() *redis.ClusterClient {
 | 
					func (r *Store) UniversalClient() goredis.UniversalClient {
 | 
				
			||||||
	if r.cli.ClusterClient != nil {
 | 
						return r.cli
 | 
				
			||||||
		return r.cli.ClusterClient
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *Store) ClusterClient() *goredis.ClusterClient {
 | 
				
			||||||
 | 
						if c, ok := r.cli.(*goredis.ClusterClient); ok {
 | 
				
			||||||
 | 
							return c
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -97,10 +110,10 @@ func (r *Store) Disconnect(ctx context.Context) error {
 | 
				
			|||||||
	case <-r.done:
 | 
						case <-r.done:
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
		if r.cli.Client != nil {
 | 
							if r.cli != nil {
 | 
				
			||||||
			err = r.cli.Client.Close()
 | 
								if err = r.cli.Close(); err != nil {
 | 
				
			||||||
		} else if r.cli.ClusterClient != nil {
 | 
									r.isConnected.Store(0)
 | 
				
			||||||
			err = r.cli.ClusterClient.Close()
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		close(r.done)
 | 
							close(r.done)
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
@@ -108,6 +121,12 @@ func (r *Store) Disconnect(ctx context.Context) error {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
 | 
					func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b := r.pool.Get()
 | 
				
			||||||
 | 
						defer r.pool.Put(b)
 | 
				
			||||||
	options := store.NewExistsOptions(opts...)
 | 
						options := store.NewExistsOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeout := r.opts.Timeout
 | 
						timeout := r.opts.Timeout
 | 
				
			||||||
@@ -121,23 +140,17 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
 | 
				
			|||||||
		defer cancel()
 | 
							defer cancel()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
						rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
				
			||||||
	ts := time.Now()
 | 
						ts := time.Now()
 | 
				
			||||||
	var err error
 | 
						val, err := r.cli.Exists(ctx, rkey).Result()
 | 
				
			||||||
	var val int64
 | 
					 | 
				
			||||||
	if r.cli.Client != nil {
 | 
					 | 
				
			||||||
		val, err = r.cli.Client.Exists(ctx, rkey).Result()
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		val, err = r.cli.ClusterClient.Exists(ctx, rkey).Result()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	setSpanError(ctx, err)
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil || (err == nil && val == 0) {
 | 
						if err == goredis.Nil || (err == nil && val == 0) {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return store.ErrNotFound
 | 
							return store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -151,6 +164,13 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
 | 
					func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b := r.pool.Get()
 | 
				
			||||||
 | 
						defer r.pool.Put(b)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := store.NewReadOptions(opts...)
 | 
						options := store.NewReadOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeout := r.opts.Timeout
 | 
						timeout := r.opts.Timeout
 | 
				
			||||||
@@ -164,23 +184,17 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
 | 
				
			|||||||
		defer cancel()
 | 
							defer cancel()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
						rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
				
			||||||
	ts := time.Now()
 | 
						ts := time.Now()
 | 
				
			||||||
	var buf []byte
 | 
						buf, err := r.cli.Get(ctx, rkey).Bytes()
 | 
				
			||||||
	var err error
 | 
					 | 
				
			||||||
	if r.cli.Client != nil {
 | 
					 | 
				
			||||||
		buf, err = r.cli.Client.Get(ctx, rkey).Bytes()
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		buf, err = r.cli.ClusterClient.Get(ctx, rkey).Bytes()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	setSpanError(ctx, err)
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil || (err == nil && buf == nil) {
 | 
						if err == goredis.Nil || (err == nil && buf == nil) {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return store.ErrNotFound
 | 
							return store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -205,6 +219,10 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts ...store.ReadOption) error {
 | 
					func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts ...store.ReadOption) error {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := store.NewReadOptions(opts...)
 | 
						options := store.NewReadOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeout := r.opts.Timeout
 | 
						timeout := r.opts.Timeout
 | 
				
			||||||
@@ -218,9 +236,15 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
 | 
				
			|||||||
		defer cancel()
 | 
							defer cancel()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var rkeys []string
 | 
				
			||||||
 | 
						var pools []*strings.Builder
 | 
				
			||||||
	if r.opts.Namespace != "" || options.Namespace != "" {
 | 
						if r.opts.Namespace != "" || options.Namespace != "" {
 | 
				
			||||||
 | 
							rkeys = make([]string, len(keys))
 | 
				
			||||||
 | 
							pools = make([]*strings.Builder, len(keys))
 | 
				
			||||||
		for idx, key := range keys {
 | 
							for idx, key := range keys {
 | 
				
			||||||
			keys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
								b := r.pool.Get()
 | 
				
			||||||
 | 
								pools[idx] = b
 | 
				
			||||||
 | 
								rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -228,17 +252,20 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
 | 
				
			|||||||
	ts := time.Now()
 | 
						ts := time.Now()
 | 
				
			||||||
	var rvals []interface{}
 | 
						var rvals []interface{}
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
						if r.opts.Namespace != "" || options.Namespace != "" {
 | 
				
			||||||
		rvals, err = r.cli.Client.MGet(ctx, keys...).Result()
 | 
							rvals, err = r.cli.MGet(ctx, rkeys...).Result()
 | 
				
			||||||
 | 
							for idx := range pools {
 | 
				
			||||||
 | 
								r.pool.Put(pools[idx])
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		rvals, err = r.cli.ClusterClient.MGet(ctx, keys...).Result()
 | 
							rvals, err = r.cli.MGet(ctx, keys...).Result()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	setSpanError(ctx, err)
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil || (len(rvals) == 0) {
 | 
						if err == goredis.Nil || (len(rvals) == 0) {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return store.ErrNotFound
 | 
							return store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -295,6 +322,10 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.DeleteOption) error {
 | 
					func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.DeleteOption) error {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := store.NewDeleteOptions(opts...)
 | 
						options := store.NewDeleteOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeout := r.opts.Timeout
 | 
						timeout := r.opts.Timeout
 | 
				
			||||||
@@ -308,26 +339,35 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
 | 
				
			|||||||
		defer cancel()
 | 
							defer cancel()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var rkeys []string
 | 
				
			||||||
 | 
						var pools []*strings.Builder
 | 
				
			||||||
	if r.opts.Namespace != "" || options.Namespace != "" {
 | 
						if r.opts.Namespace != "" || options.Namespace != "" {
 | 
				
			||||||
 | 
							rkeys = make([]string, len(keys))
 | 
				
			||||||
 | 
							pools = make([]*strings.Builder, len(keys))
 | 
				
			||||||
		for idx, key := range keys {
 | 
							for idx, key := range keys {
 | 
				
			||||||
			keys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
								b := r.pool.Get()
 | 
				
			||||||
 | 
								pools[idx] = b
 | 
				
			||||||
 | 
								rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
				
			||||||
	ts := time.Now()
 | 
						ts := time.Now()
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
						if r.opts.Namespace != "" || options.Namespace != "" {
 | 
				
			||||||
		err = r.cli.Client.Del(ctx, keys...).Err()
 | 
							err = r.cli.Del(ctx, rkeys...).Err()
 | 
				
			||||||
 | 
							for idx := range pools {
 | 
				
			||||||
 | 
								r.pool.Put(pools[idx])
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		err = r.cli.ClusterClient.Del(ctx, keys...).Err()
 | 
							err = r.cli.Del(ctx, keys...).Err()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	setSpanError(ctx, err)
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil {
 | 
						if err == goredis.Nil {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return store.ErrNotFound
 | 
							return store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -341,6 +381,13 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
 | 
					func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b := r.pool.Get()
 | 
				
			||||||
 | 
						defer r.pool.Put(b)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := store.NewDeleteOptions(opts...)
 | 
						options := store.NewDeleteOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeout := r.opts.Timeout
 | 
						timeout := r.opts.Timeout
 | 
				
			||||||
@@ -356,18 +403,13 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
				
			||||||
	ts := time.Now()
 | 
						ts := time.Now()
 | 
				
			||||||
	var err error
 | 
						err := r.cli.Del(ctx, r.getKey(b, r.opts.Namespace, options.Namespace, key)).Err()
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
					 | 
				
			||||||
		err = r.cli.Client.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		err = r.cli.ClusterClient.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	setSpanError(ctx, err)
 | 
					 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil {
 | 
						if err == goredis.Nil {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return store.ErrNotFound
 | 
							return store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -381,6 +423,10 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, opts ...store.WriteOption) error {
 | 
					func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, opts ...store.WriteOption) error {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := store.NewWriteOptions(opts...)
 | 
						options := store.NewWriteOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeout := r.opts.Timeout
 | 
						timeout := r.opts.Timeout
 | 
				
			||||||
@@ -395,9 +441,11 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	kvs := make([]string, 0, len(keys)*2)
 | 
						kvs := make([]string, 0, len(keys)*2)
 | 
				
			||||||
 | 
						pools := make([]*strings.Builder, len(keys))
 | 
				
			||||||
	for idx, key := range keys {
 | 
						for idx, key := range keys {
 | 
				
			||||||
		kvs = append(kvs, r.getKey(r.opts.Namespace, options.Namespace, key))
 | 
							b := r.pool.Get()
 | 
				
			||||||
 | 
							pools[idx] = b
 | 
				
			||||||
 | 
							kvs = append(kvs, r.getKey(b, r.opts.Namespace, options.Namespace, key))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		switch vt := vals[idx].(type) {
 | 
							switch vt := vals[idx].(type) {
 | 
				
			||||||
		case string:
 | 
							case string:
 | 
				
			||||||
@@ -414,9 +462,8 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
				
			||||||
	ts := time.Now()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pipeliner := func(pipe redis.Pipeliner) error {
 | 
						pipeliner := func(pipe goredis.Pipeliner) error {
 | 
				
			||||||
		for idx := 0; idx < len(kvs); idx += 2 {
 | 
							for idx := 0; idx < len(kvs); idx += 2 {
 | 
				
			||||||
			if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil {
 | 
								if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil {
 | 
				
			||||||
				setSpanError(ctx, err)
 | 
									setSpanError(ctx, err)
 | 
				
			||||||
@@ -426,20 +473,18 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
				
			|||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var err error
 | 
						ts := time.Now()
 | 
				
			||||||
	var cmds []redis.Cmder
 | 
						cmds, err := r.cli.Pipelined(ctx, pipeliner)
 | 
				
			||||||
 | 
						for idx := range pools {
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
							r.pool.Put(pools[idx])
 | 
				
			||||||
		cmds, err = r.cli.Client.Pipelined(ctx, pipeliner)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		cmds, err = r.cli.ClusterClient.Pipelined(ctx, pipeliner)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	setSpanError(ctx, err)
 | 
					 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil {
 | 
						if err == goredis.Nil {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return store.ErrNotFound
 | 
							return store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -451,7 +496,7 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	for _, cmd := range cmds {
 | 
						for _, cmd := range cmds {
 | 
				
			||||||
		if err = cmd.Err(); err != nil {
 | 
							if err = cmd.Err(); err != nil {
 | 
				
			||||||
			if err == redis.Nil {
 | 
								if err == goredis.Nil {
 | 
				
			||||||
				r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
									r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
				return store.ErrNotFound
 | 
									return store.ErrNotFound
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -465,6 +510,13 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
 | 
					func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b := r.pool.Get()
 | 
				
			||||||
 | 
						defer r.pool.Put(b)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := store.NewWriteOptions(opts...)
 | 
						options := store.NewWriteOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timeout := r.opts.Timeout
 | 
						timeout := r.opts.Timeout
 | 
				
			||||||
@@ -478,7 +530,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
 | 
				
			|||||||
		defer cancel()
 | 
							defer cancel()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rkey := r.getKey(r.opts.Namespace, options.Namespace, key)
 | 
						rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var buf []byte
 | 
						var buf []byte
 | 
				
			||||||
	switch vt := val.(type) {
 | 
						switch vt := val.(type) {
 | 
				
			||||||
@@ -496,18 +548,14 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc()
 | 
				
			||||||
	ts := time.Now()
 | 
						ts := time.Now()
 | 
				
			||||||
	var err error
 | 
						err := r.cli.Set(ctx, rkey, buf, options.TTL).Err()
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
					 | 
				
			||||||
		err = r.cli.Client.Set(ctx, rkey, buf, options.TTL).Err()
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		err = r.cli.ClusterClient.Set(ctx, rkey, buf, options.TTL).Err()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	setSpanError(ctx, err)
 | 
					 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil {
 | 
						if err == goredis.Nil {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return store.ErrNotFound
 | 
							return store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -521,12 +569,19 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
 | 
					func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) {
 | 
				
			||||||
 | 
						if err := r.connect(ctx); err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						b := r.pool.Get()
 | 
				
			||||||
 | 
						defer r.pool.Put(b)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	options := store.NewListOptions(opts...)
 | 
						options := store.NewListOptions(opts...)
 | 
				
			||||||
	if len(options.Namespace) == 0 {
 | 
						if len(options.Namespace) == 0 {
 | 
				
			||||||
		options.Namespace = r.opts.Namespace
 | 
							options.Namespace = r.opts.Namespace
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rkey := r.getKey(options.Namespace, "", options.Prefix+"*")
 | 
						rkey := r.getKey(b, options.Namespace, "", options.Prefix+"*")
 | 
				
			||||||
	if options.Suffix != "" {
 | 
						if options.Suffix != "" {
 | 
				
			||||||
		rkey += options.Suffix
 | 
							rkey += options.Suffix
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -548,10 +603,8 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
 | 
				
			|||||||
	var keys []string
 | 
						var keys []string
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
						if c, ok := r.cli.(*goredis.ClusterClient); ok {
 | 
				
			||||||
		keys, err = r.cli.Client.Keys(ctx, rkey).Result()
 | 
							err = c.ForEachMaster(ctx, func(nctx context.Context, cli *goredis.Client) error {
 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		err = r.cli.ClusterClient.ForEachMaster(ctx, func(nctx context.Context, cli *redis.Client) error {
 | 
					 | 
				
			||||||
			nkeys, nerr := cli.Keys(nctx, rkey).Result()
 | 
								nkeys, nerr := cli.Keys(nctx, rkey).Result()
 | 
				
			||||||
			if nerr != nil {
 | 
								if nerr != nil {
 | 
				
			||||||
				return nerr
 | 
									return nerr
 | 
				
			||||||
@@ -559,13 +612,16 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e
 | 
				
			|||||||
			keys = append(keys, nkeys...)
 | 
								keys = append(keys, nkeys...)
 | 
				
			||||||
			return nil
 | 
								return nil
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							keys, err = r.cli.Keys(ctx, rkey).Result()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	setSpanError(ctx, err)
 | 
					 | 
				
			||||||
	te := time.Since(ts)
 | 
						te := time.Since(ts)
 | 
				
			||||||
 | 
						setSpanError(ctx, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
						r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec()
 | 
				
			||||||
	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
						r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds())
 | 
				
			||||||
	if err == redis.Nil {
 | 
						if err == goredis.Nil {
 | 
				
			||||||
		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
							r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc()
 | 
				
			||||||
		return nil, store.ErrNotFound
 | 
							return nil, store.ErrNotFound
 | 
				
			||||||
	} else if err == nil {
 | 
						} else if err == nil {
 | 
				
			||||||
@@ -607,79 +663,102 @@ func NewStore(opts ...store.Option) *Store {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) configure() error {
 | 
					func (r *Store) configure() error {
 | 
				
			||||||
	var redisOptions *redis.Options
 | 
					 | 
				
			||||||
	var redisClusterOptions *redis.ClusterOptions
 | 
					 | 
				
			||||||
	var err error
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	nodes := r.opts.Addrs
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if len(nodes) == 0 {
 | 
					 | 
				
			||||||
		nodes = []string{"redis://127.0.0.1:6379"}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if r.cli != nil && r.opts.Context == nil {
 | 
						if r.cli != nil && r.opts.Context == nil {
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						universalOptions := DefaultUniversalOptions
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if r.opts.Context != nil {
 | 
						if r.opts.Context != nil {
 | 
				
			||||||
		if c, ok := r.opts.Context.Value(configKey{}).(*redis.Options); ok {
 | 
							if o, ok := r.opts.Context.Value(configKey{}).(*goredis.Options); ok {
 | 
				
			||||||
			redisOptions = c
 | 
								universalOptions.Addrs = []string{o.Addr}
 | 
				
			||||||
 | 
								universalOptions.Dialer = o.Dialer
 | 
				
			||||||
 | 
								universalOptions.OnConnect = o.OnConnect
 | 
				
			||||||
 | 
								universalOptions.Username = o.Username
 | 
				
			||||||
 | 
								universalOptions.Password = o.Password
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.MaxRetries = o.MaxRetries
 | 
				
			||||||
 | 
								universalOptions.MinRetryBackoff = o.MinRetryBackoff
 | 
				
			||||||
 | 
								universalOptions.MaxRetryBackoff = o.MaxRetryBackoff
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.DialTimeout = o.DialTimeout
 | 
				
			||||||
 | 
								universalOptions.ReadTimeout = o.ReadTimeout
 | 
				
			||||||
 | 
								universalOptions.WriteTimeout = o.WriteTimeout
 | 
				
			||||||
 | 
								universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.PoolFIFO = o.PoolFIFO
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.PoolSize = o.PoolSize
 | 
				
			||||||
 | 
								universalOptions.PoolTimeout = o.PoolTimeout
 | 
				
			||||||
 | 
								universalOptions.MinIdleConns = o.MinIdleConns
 | 
				
			||||||
 | 
								universalOptions.MaxIdleConns = o.MaxIdleConns
 | 
				
			||||||
 | 
								universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime
 | 
				
			||||||
 | 
								universalOptions.ConnMaxLifetime = o.ConnMaxLifetime
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if r.opts.TLSConfig != nil {
 | 
								if r.opts.TLSConfig != nil {
 | 
				
			||||||
				redisOptions.TLSConfig = r.opts.TLSConfig
 | 
									universalOptions.TLSConfig = r.opts.TLSConfig
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if c, ok := r.opts.Context.Value(clusterConfigKey{}).(*redis.ClusterOptions); ok {
 | 
							if o, ok := r.opts.Context.Value(clusterConfigKey{}).(*goredis.ClusterOptions); ok {
 | 
				
			||||||
			redisClusterOptions = c
 | 
								universalOptions.Addrs = o.Addrs
 | 
				
			||||||
 | 
								universalOptions.Dialer = o.Dialer
 | 
				
			||||||
 | 
								universalOptions.OnConnect = o.OnConnect
 | 
				
			||||||
 | 
								universalOptions.Username = o.Username
 | 
				
			||||||
 | 
								universalOptions.Password = o.Password
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.MaxRedirects = o.MaxRedirects
 | 
				
			||||||
 | 
								universalOptions.ReadOnly = o.ReadOnly
 | 
				
			||||||
 | 
								universalOptions.RouteByLatency = o.RouteByLatency
 | 
				
			||||||
 | 
								universalOptions.RouteRandomly = o.RouteRandomly
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.MaxRetries = o.MaxRetries
 | 
				
			||||||
 | 
								universalOptions.MinRetryBackoff = o.MinRetryBackoff
 | 
				
			||||||
 | 
								universalOptions.MaxRetryBackoff = o.MaxRetryBackoff
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.DialTimeout = o.DialTimeout
 | 
				
			||||||
 | 
								universalOptions.ReadTimeout = o.ReadTimeout
 | 
				
			||||||
 | 
								universalOptions.WriteTimeout = o.WriteTimeout
 | 
				
			||||||
 | 
								universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.PoolFIFO = o.PoolFIFO
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								universalOptions.PoolSize = o.PoolSize
 | 
				
			||||||
 | 
								universalOptions.PoolTimeout = o.PoolTimeout
 | 
				
			||||||
 | 
								universalOptions.MinIdleConns = o.MinIdleConns
 | 
				
			||||||
 | 
								universalOptions.MaxIdleConns = o.MaxIdleConns
 | 
				
			||||||
 | 
								universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime
 | 
				
			||||||
 | 
								universalOptions.ConnMaxLifetime = o.ConnMaxLifetime
 | 
				
			||||||
			if r.opts.TLSConfig != nil {
 | 
								if r.opts.TLSConfig != nil {
 | 
				
			||||||
				redisClusterOptions.TLSConfig = r.opts.TLSConfig
 | 
									universalOptions.TLSConfig = r.opts.TLSConfig
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if o, ok := r.opts.Context.Value(universalConfigKey{}).(*goredis.UniversalOptions); ok {
 | 
				
			||||||
 | 
								universalOptions = o
 | 
				
			||||||
 | 
								if r.opts.TLSConfig != nil {
 | 
				
			||||||
 | 
									universalOptions.TLSConfig = r.opts.TLSConfig
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if redisOptions != nil && redisClusterOptions != nil {
 | 
						if len(r.opts.Addrs) > 0 {
 | 
				
			||||||
		return fmt.Errorf("must specify only one option Config or ClusterConfig")
 | 
							universalOptions.Addrs = r.opts.Addrs
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							universalOptions.Addrs = []string{"127.0.0.1:6379"}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if redisOptions == nil && redisClusterOptions == nil && r.cli != nil {
 | 
						r.cli = goredis.NewUniversalClient(universalOptions)
 | 
				
			||||||
		return nil
 | 
						setTracing(r.cli, r.opts.Tracer)
 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if redisOptions == nil && redisClusterOptions == nil && len(nodes) == 1 {
 | 
						r.pool = pool.NewStringsPool(50)
 | 
				
			||||||
		redisOptions, err = redis.ParseURL(nodes[0])
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			redisOptions = DefaultOptions
 | 
					 | 
				
			||||||
			redisOptions.Addr = r.opts.Addrs[0]
 | 
					 | 
				
			||||||
			redisOptions.TLSConfig = r.opts.TLSConfig
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	} else if redisOptions == nil && redisClusterOptions == nil && len(nodes) > 1 {
 | 
					 | 
				
			||||||
		redisClusterOptions = DefaultClusterOptions
 | 
					 | 
				
			||||||
		redisClusterOptions.Addrs = r.opts.Addrs
 | 
					 | 
				
			||||||
		redisClusterOptions.TLSConfig = r.opts.TLSConfig
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if redisOptions != nil {
 | 
					 | 
				
			||||||
		c := redis.NewClient(redisOptions)
 | 
					 | 
				
			||||||
		setTracing(c, r.opts.Tracer)
 | 
					 | 
				
			||||||
		r.cli = &wrappedClient{Client: c}
 | 
					 | 
				
			||||||
	} else if redisClusterOptions != nil {
 | 
					 | 
				
			||||||
		c := redis.NewClusterClient(redisClusterOptions)
 | 
					 | 
				
			||||||
		setTracing(c, r.opts.Tracer)
 | 
					 | 
				
			||||||
		r.cli = &wrappedClient{ClusterClient: c}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	r.pool = pool.NewPool(func() *strings.Builder { return &strings.Builder{} })
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	r.statsMeter()
 | 
						r.statsMeter()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) getKey(mainNamespace string, opNamespace string, key string) string {
 | 
					func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace string, key string) string {
 | 
				
			||||||
	b := r.pool.Get()
 | 
					 | 
				
			||||||
	defer r.pool.Put(b)
 | 
					 | 
				
			||||||
	b.Reset()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if opNamespace == "" {
 | 
						if opNamespace == "" {
 | 
				
			||||||
		opNamespace = mainNamespace
 | 
							opNamespace = mainNamespace
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -690,3 +769,14 @@ func (r *Store) getKey(mainNamespace string, opNamespace string, key string) str
 | 
				
			|||||||
	b.WriteString(key)
 | 
						b.WriteString(key)
 | 
				
			||||||
	return b.String()
 | 
						return b.String()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *Store) connect(ctx context.Context) (err error) {
 | 
				
			||||||
 | 
						if r.isConnected.Load() == 0 {
 | 
				
			||||||
 | 
							if err = r.cli.Ping(ctx).Err(); err != nil {
 | 
				
			||||||
 | 
								setSpanError(ctx, err)
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						r.isConnected.Store(1)
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -7,6 +7,7 @@ import (
 | 
				
			|||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						goredis "github.com/redis/go-redis/v9"
 | 
				
			||||||
	"go.unistack.org/micro/v3/store"
 | 
						"go.unistack.org/micro/v3/store"
 | 
				
			||||||
	"go.unistack.org/micro/v3/tracer"
 | 
						"go.unistack.org/micro/v3/tracer"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -41,7 +42,7 @@ func TestKeepTTL(t *testing.T) {
 | 
				
			|||||||
func Test_rkv_configure(t *testing.T) {
 | 
					func Test_rkv_configure(t *testing.T) {
 | 
				
			||||||
	type fields struct {
 | 
						type fields struct {
 | 
				
			||||||
		options store.Options
 | 
							options store.Options
 | 
				
			||||||
		Client  *wrappedClient
 | 
							Client  goredis.UniversalClient
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	type wantValues struct {
 | 
						type wantValues struct {
 | 
				
			||||||
		username string
 | 
							username string
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										22
									
								
								stats.go
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								stats.go
									
									
									
									
									
								
							@@ -3,7 +3,8 @@ package redis
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/redis/go-redis/v9"
 | 
						goredis "github.com/redis/go-redis/v9"
 | 
				
			||||||
 | 
						"go.unistack.org/micro/v3/meter"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var (
 | 
					var (
 | 
				
			||||||
@@ -13,34 +14,26 @@ var (
 | 
				
			|||||||
	PoolConnTotalCurrent = "pool_conn_total_current"
 | 
						PoolConnTotalCurrent = "pool_conn_total_current"
 | 
				
			||||||
	PoolConnIdleCurrent  = "pool_conn_idle_current"
 | 
						PoolConnIdleCurrent  = "pool_conn_idle_current"
 | 
				
			||||||
	PoolConnStaleTotal   = "pool_conn_stale_total"
 | 
						PoolConnStaleTotal   = "pool_conn_stale_total"
 | 
				
			||||||
 | 
					 | 
				
			||||||
	meterRequestTotal               = "request_total"
 | 
					 | 
				
			||||||
	meterRequestLatencyMicroseconds = "latency_microseconds"
 | 
					 | 
				
			||||||
	meterRequestDurationSeconds     = "request_duration_seconds"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type Statser interface {
 | 
					type Statser interface {
 | 
				
			||||||
	PoolStats() *redis.PoolStats
 | 
						PoolStats() *goredis.PoolStats
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Store) statsMeter() {
 | 
					func (r *Store) statsMeter() {
 | 
				
			||||||
	var st Statser
 | 
						var st Statser
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if r.cli.Client != nil {
 | 
						if r.cli != nil {
 | 
				
			||||||
		st = r.cli.Client
 | 
							st = r.cli
 | 
				
			||||||
	} else if r.cli.ClusterClient != nil {
 | 
					 | 
				
			||||||
		st = r.cli.ClusterClient
 | 
					 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		ticker := time.NewTicker(DefaultMeterStatsInterval)
 | 
							ticker := time.NewTicker(meter.DefaultMeterStatsInterval)
 | 
				
			||||||
		defer ticker.Stop()
 | 
							defer ticker.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		for {
 | 
							for _ = range ticker.C {
 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case <-ticker.C:
 | 
					 | 
				
			||||||
			if st == nil {
 | 
								if st == nil {
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -52,6 +45,5 @@ func (r *Store) statsMeter() {
 | 
				
			|||||||
			r.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns))
 | 
								r.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns))
 | 
				
			||||||
			r.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns))
 | 
								r.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns))
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										65
									
								
								tracer.go
									
									
									
									
									
								
							
							
						
						
									
										65
									
								
								tracer.go
									
									
									
									
									
								
							@@ -6,27 +6,25 @@ import (
 | 
				
			|||||||
	"net"
 | 
						"net"
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/redis/go-redis/extra/rediscmd/v9"
 | 
						rediscmd "github.com/redis/go-redis/extra/rediscmd/v9"
 | 
				
			||||||
	"github.com/redis/go-redis/v9"
 | 
						goredis "github.com/redis/go-redis/v9"
 | 
				
			||||||
	"go.unistack.org/micro/v3/tracer"
 | 
						"go.unistack.org/micro/v3/tracer"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func setTracing(rdb redis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) {
 | 
					func setTracing(rdb goredis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) {
 | 
				
			||||||
	switch rdb := rdb.(type) {
 | 
						switch rdb := rdb.(type) {
 | 
				
			||||||
	case *redis.Client:
 | 
						case *goredis.Client:
 | 
				
			||||||
		opt := rdb.Options()
 | 
							opt := rdb.Options()
 | 
				
			||||||
		connString := formatDBConnString(opt.Network, opt.Addr)
 | 
							connString := formatDBConnString(opt.Network, opt.Addr)
 | 
				
			||||||
		rdb.AddHook(newTracingHook(connString, tr))
 | 
							rdb.AddHook(newTracingHook(connString, tr))
 | 
				
			||||||
	case *redis.ClusterClient:
 | 
						case *goredis.ClusterClient:
 | 
				
			||||||
		rdb.AddHook(newTracingHook("", tr, opts...))
 | 
							rdb.OnNewNode(func(rdb *goredis.Client) {
 | 
				
			||||||
		rdb.OnNewNode(func(rdb *redis.Client) {
 | 
					 | 
				
			||||||
			opt := rdb.Options()
 | 
								opt := rdb.Options()
 | 
				
			||||||
			connString := formatDBConnString(opt.Network, opt.Addr)
 | 
								connString := formatDBConnString(opt.Network, opt.Addr)
 | 
				
			||||||
			rdb.AddHook(newTracingHook(connString, tr))
 | 
								rdb.AddHook(newTracingHook(connString, tr))
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
	case *redis.Ring:
 | 
						case *goredis.Ring:
 | 
				
			||||||
		rdb.AddHook(newTracingHook("", tr, opts...))
 | 
							rdb.OnNewNode(func(rdb *goredis.Client) {
 | 
				
			||||||
		rdb.OnNewNode(func(rdb *redis.Client) {
 | 
					 | 
				
			||||||
			opt := rdb.Options()
 | 
								opt := rdb.Options()
 | 
				
			||||||
			connString := formatDBConnString(opt.Network, opt.Addr)
 | 
								connString := formatDBConnString(opt.Network, opt.Addr)
 | 
				
			||||||
			rdb.AddHook(newTracingHook(connString, tr))
 | 
								rdb.AddHook(newTracingHook(connString, tr))
 | 
				
			||||||
@@ -39,7 +37,7 @@ type tracingHook struct {
 | 
				
			|||||||
	opts []tracer.SpanOption
 | 
						opts []tracer.SpanOption
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var _ redis.Hook = (*tracingHook)(nil)
 | 
					var _ goredis.Hook = (*tracingHook)(nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook {
 | 
					func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook {
 | 
				
			||||||
	opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient))
 | 
						opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient))
 | 
				
			||||||
@@ -53,45 +51,54 @@ func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOpti
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
 | 
					func (h *tracingHook) DialHook(hook goredis.DialHook) goredis.DialHook {
 | 
				
			||||||
	return func(ctx context.Context, network, addr string) (net.Conn, error) {
 | 
						return func(ctx context.Context, network, addr string) (net.Conn, error) {
 | 
				
			||||||
		sctx, span := h.tr.Start(ctx, "redis.dial", h.opts...)
 | 
							/*
 | 
				
			||||||
 | 
								_, span := h.tr.Start(ctx, "goredis.dial", h.opts...)
 | 
				
			||||||
			defer span.Finish()
 | 
								defer span.Finish()
 | 
				
			||||||
 | 
							*/
 | 
				
			||||||
		conn, err := hook(sctx, network, addr)
 | 
							conn, err := hook(ctx, network, addr)
 | 
				
			||||||
		recordError(span, err)
 | 
							// recordError(span, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return conn, err
 | 
							return conn, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
 | 
					func (h *tracingHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook {
 | 
				
			||||||
	return func(ctx context.Context, cmd redis.Cmder) error {
 | 
						return func(ctx context.Context, cmd goredis.Cmder) error {
 | 
				
			||||||
		cmdString := rediscmd.CmdString(cmd)
 | 
							cmdString := rediscmd.CmdString(cmd)
 | 
				
			||||||
 | 
							var err error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		sctx, span := h.tr.Start(ctx, "redis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...)
 | 
							switch cmdString {
 | 
				
			||||||
		defer span.Finish()
 | 
							case "cluster slots":
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
		err := hook(sctx, cmd)
 | 
							default:
 | 
				
			||||||
 | 
								_, span := h.tr.Start(ctx, "goredis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...)
 | 
				
			||||||
 | 
								defer func() {
 | 
				
			||||||
				recordError(span, err)
 | 
									recordError(span, err)
 | 
				
			||||||
 | 
									span.Finish()
 | 
				
			||||||
 | 
								}()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							err = hook(ctx, cmd)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
 | 
					func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook {
 | 
				
			||||||
	return func(ctx context.Context, cmds []redis.Cmder) error {
 | 
						return func(ctx context.Context, cmds []goredis.Cmder) error {
 | 
				
			||||||
		_, cmdsString := rediscmd.CmdsString(cmds)
 | 
							_, cmdsString := rediscmd.CmdsString(cmds)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		opts := append(h.opts, tracer.WithSpanLabels(
 | 
							opts := append(h.opts, tracer.WithSpanLabels(
 | 
				
			||||||
			"db.redis.num_cmd", strconv.Itoa(len(cmds)),
 | 
								"db.goredis.num_cmd", strconv.Itoa(len(cmds)),
 | 
				
			||||||
			"db.statement", cmdsString,
 | 
								"db.statement", cmdsString,
 | 
				
			||||||
		))
 | 
							))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		sctx, span := h.tr.Start(ctx, "redis.process_pipeline", opts...)
 | 
							_, span := h.tr.Start(ctx, "goredis.process_pipeline", opts...)
 | 
				
			||||||
		defer span.Finish()
 | 
							defer span.Finish()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		err := hook(sctx, cmds)
 | 
							err := hook(ctx, cmds)
 | 
				
			||||||
		recordError(span, err)
 | 
							recordError(span, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
@@ -99,7 +106,7 @@ func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func setSpanError(ctx context.Context, err error) {
 | 
					func setSpanError(ctx context.Context, err error) {
 | 
				
			||||||
	if err == nil || err == redis.Nil {
 | 
						if err == nil || err == goredis.Nil {
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil {
 | 
						if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil {
 | 
				
			||||||
@@ -108,7 +115,7 @@ func setSpanError(ctx context.Context, err error) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func recordError(span tracer.Span, err error) {
 | 
					func recordError(span tracer.Span, err error) {
 | 
				
			||||||
	if err != nil && err != redis.Nil {
 | 
						if err != nil && err != goredis.Nil {
 | 
				
			||||||
		span.SetStatus(tracer.SpanStatusError, err.Error())
 | 
							span.SetStatus(tracer.SpanStatusError, err.Error())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user