Compare commits
	
		
			21 Commits
		
	
	
		
			v3.10.6
			...
			b2d89018b8
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b2d89018b8 | |||
| 0981f89f60 | |||
| 332fe5f4d4 | |||
| 757fe0245b | |||
| 27eccc1ed2 | |||
| 7c641fa8ac | |||
| 24c9f20196 | |||
| 953b5b0021 | |||
| 87e2e2b947 | |||
| 256e61a437 | |||
| f9cdd41c94 | |||
| ecad15fe17 | |||
| fa3d18b353 | |||
| 2f3951773f | |||
| b263e14032 | |||
| 518cc1db73 | |||
| 4484cd34ec | |||
| 7bceeee6bf | |||
| aed9512b93 | |||
| de72a10973 | |||
| 62c2de51d4 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| .idea | ||||
							
								
								
									
										15
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								go.mod
									
									
									
									
									
								
							| @@ -1,14 +1,19 @@ | ||||
| module go.unistack.org/micro-store-redis/v3 | ||||
|  | ||||
| go 1.20 | ||||
| go 1.22 | ||||
|  | ||||
| toolchain go1.22.4 | ||||
|  | ||||
| require ( | ||||
| 	github.com/redis/go-redis/v9 v9.2.1 | ||||
| 	go.unistack.org/micro/v3 v3.10.62 | ||||
| 	github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 | ||||
| 	github.com/redis/go-redis/v9 v9.7.0 | ||||
| 	go.unistack.org/micro/v3 v3.10.105 | ||||
| ) | ||||
|  | ||||
| require ( | ||||
| 	github.com/cespare/xxhash/v2 v2.2.0 // indirect | ||||
| 	github.com/cespare/xxhash/v2 v2.3.0 // indirect | ||||
| 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect | ||||
| 	github.com/patrickmn/go-cache v2.1.0+incompatible // indirect | ||||
| 	github.com/google/go-cmp v0.6.0 // indirect | ||||
| 	go.unistack.org/micro-proto/v3 v3.4.1 // indirect | ||||
| 	google.golang.org/protobuf v1.35.2 // indirect | ||||
| ) | ||||
|   | ||||
							
								
								
									
										24
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								go.sum
									
									
									
									
									
								
							| @@ -1,12 +1,20 @@ | ||||
| github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= | ||||
| github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= | ||||
| github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= | ||||
| github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= | ||||
| github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= | ||||
| github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= | ||||
| github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= | ||||
| github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= | ||||
| github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= | ||||
| github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= | ||||
| github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= | ||||
| github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= | ||||
| github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= | ||||
| github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= | ||||
| go.unistack.org/micro/v3 v3.10.62 h1:PCwLSt3W53UGosH/5qU3kU0iJxK8jlKOm9p4v/Zti5o= | ||||
| go.unistack.org/micro/v3 v3.10.62/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= | ||||
| github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | ||||
| github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | ||||
| github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk= | ||||
| github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0/go.mod h1:eTg/YQtGYAZD5r3DlGlJptJ45AHA+/G+2NPn30PKzik= | ||||
| github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= | ||||
| github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= | ||||
| go.unistack.org/micro/v3 v3.10.105 h1:JYNV0d+fnR7Hy8d4/sjr+25DbSNqq1Z7IPeDDdB+f1I= | ||||
| go.unistack.org/micro/v3 v3.10.105/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g= | ||||
| google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= | ||||
| google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= | ||||
|   | ||||
							
								
								
									
										55
									
								
								options.go
									
									
									
									
									
								
							
							
						
						
									
										55
									
								
								options.go
									
									
									
									
									
								
							| @@ -1,18 +1,67 @@ | ||||
| package redis | ||||
|  | ||||
| import ( | ||||
| 	"github.com/redis/go-redis/v9" | ||||
| 	goredis "github.com/redis/go-redis/v9" | ||||
| 	"go.unistack.org/micro/v3/logger" | ||||
| 	"go.unistack.org/micro/v3/meter" | ||||
| 	"go.unistack.org/micro/v3/store" | ||||
| 	"go.unistack.org/micro/v3/tracer" | ||||
| ) | ||||
|  | ||||
| type configKey struct{} | ||||
|  | ||||
| func Config(c *redis.Options) store.Option { | ||||
| func Config(c *goredis.Options) store.Option { | ||||
| 	return store.SetOption(configKey{}, c) | ||||
| } | ||||
|  | ||||
| type clusterConfigKey struct{} | ||||
|  | ||||
| func ClusterConfig(c *redis.ClusterOptions) store.Option { | ||||
| func ClusterConfig(c *goredis.ClusterOptions) store.Option { | ||||
| 	return store.SetOption(clusterConfigKey{}, c) | ||||
| } | ||||
|  | ||||
| type universalConfigKey struct{} | ||||
|  | ||||
| func UniversalConfig(c *goredis.UniversalOptions) store.Option { | ||||
| 	return store.SetOption(universalConfigKey{}, c) | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	labelHost = "redis_host" | ||||
| 	labelName = "redis_name" | ||||
| ) | ||||
|  | ||||
| // Options struct holds wrapper options | ||||
| type Options struct { | ||||
| 	Logger    logger.Logger | ||||
| 	Meter     meter.Meter | ||||
| 	Tracer    tracer.Tracer | ||||
| 	RedisHost string | ||||
| 	RedisName string | ||||
| } | ||||
|  | ||||
| // Option func signature | ||||
| type Option func(*Options) | ||||
|  | ||||
| // NewOptions create new Options struct from provided option slice | ||||
| func NewOptions(opts ...Option) Options { | ||||
| 	options := Options{ | ||||
| 		Logger: logger.DefaultLogger, | ||||
| 		Meter:  meter.DefaultMeter, | ||||
| 		Tracer: tracer.DefaultTracer, | ||||
| 	} | ||||
|  | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	options.Meter = options.Meter.Clone( | ||||
| 		meter.Labels( | ||||
| 			labelHost, options.RedisHost, | ||||
| 			labelName, options.RedisName), | ||||
| 	) | ||||
|  | ||||
| 	options.Logger = options.Logger.Clone(logger.WithAddCallerSkipCount(1)) | ||||
|  | ||||
| 	return options | ||||
| } | ||||
|   | ||||
							
								
								
									
										529
									
								
								redis.go
									
									
									
									
									
								
							
							
						
						
									
										529
									
								
								redis.go
									
									
									
									
									
								
							| @@ -2,22 +2,21 @@ package redis | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"strings" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	redis "github.com/redis/go-redis/v9" | ||||
| 	goredis "github.com/redis/go-redis/v9" | ||||
| 	"go.unistack.org/micro/v3/semconv" | ||||
| 	"go.unistack.org/micro/v3/store" | ||||
| 	"go.unistack.org/micro/v3/tracer" | ||||
| 	pool "go.unistack.org/micro/v3/util/xpool" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	DefaultPathSeparator = "/" | ||||
|  | ||||
| 	DefaultClusterOptions = &redis.ClusterOptions{ | ||||
| 	DefaultUniversalOptions = &goredis.UniversalOptions{ | ||||
| 		Username:        "", | ||||
| 		Password:        "", // no password set | ||||
| 		MaxRetries:      2, | ||||
| @@ -29,7 +28,19 @@ var ( | ||||
| 		MinIdleConns:    10, | ||||
| 	} | ||||
|  | ||||
| 	DefaultOptions = &redis.Options{ | ||||
| 	DefaultClusterOptions = &goredis.ClusterOptions{ | ||||
| 		Username:        "", | ||||
| 		Password:        "", // no password set | ||||
| 		MaxRetries:      2, | ||||
| 		MaxRetryBackoff: 256 * time.Millisecond, | ||||
| 		DialTimeout:     1 * time.Second, | ||||
| 		ReadTimeout:     1 * time.Second, | ||||
| 		WriteTimeout:    1 * time.Second, | ||||
| 		PoolTimeout:     1 * time.Second, | ||||
| 		MinIdleConns:    10, | ||||
| 	} | ||||
|  | ||||
| 	DefaultOptions = &goredis.Options{ | ||||
| 		Username:        "", | ||||
| 		Password:        "", // no password set | ||||
| 		DB:              0,  // use default DB | ||||
| @@ -44,27 +55,22 @@ var ( | ||||
| ) | ||||
|  | ||||
| type Store struct { | ||||
| 	opts store.Options | ||||
| 	cli  redisClient | ||||
| 	pool pool.Pool[strings.Builder] | ||||
| } | ||||
|  | ||||
| type redisClient interface { | ||||
| 	Get(ctx context.Context, key string) *redis.StringCmd | ||||
| 	Del(ctx context.Context, keys ...string) *redis.IntCmd | ||||
| 	Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd | ||||
| 	Keys(ctx context.Context, pattern string) *redis.StringSliceCmd | ||||
| 	MGet(ctx context.Context, keys ...string) *redis.SliceCmd | ||||
| 	MSet(ctx context.Context, kv ...interface{}) *redis.StatusCmd | ||||
| 	Exists(ctx context.Context, keys ...string) *redis.IntCmd | ||||
| 	Ping(ctx context.Context) *redis.StatusCmd | ||||
| 	Pipeline() redis.Pipeliner | ||||
| 	Pipelined(ctx context.Context, fn func(redis.Pipeliner) error) ([]redis.Cmder, error) | ||||
| 	Close() error | ||||
| 	opts        store.Options | ||||
| 	cli         goredis.UniversalClient | ||||
| 	done        chan struct{} | ||||
| 	pool        *pool.StringsPool | ||||
| 	isConnected atomic.Int32 | ||||
| } | ||||
|  | ||||
| func (r *Store) Connect(ctx context.Context) error { | ||||
| 	return r.cli.Ping(ctx).Err() | ||||
| 	if r.cli == nil { | ||||
| 		return store.ErrNotConnected | ||||
| 	} | ||||
| 	if r.opts.LazyConnect { | ||||
| 		return nil | ||||
| 	} | ||||
| 	return r.connect(ctx) | ||||
|  | ||||
| } | ||||
|  | ||||
| func (r *Store) Init(opts ...store.Option) error { | ||||
| @@ -72,18 +78,55 @@ func (r *Store) Init(opts ...store.Option) error { | ||||
| 		o(&r.opts) | ||||
| 	} | ||||
|  | ||||
| 	return r.configure() | ||||
| 	err := r.configure() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (r *Store) Redis() *redis.Client { | ||||
| 	return r.cli.(*redis.Client) | ||||
| func (r *Store) Client() *goredis.Client { | ||||
| 	if c, ok := r.cli.(*goredis.Client); ok { | ||||
| 		return c | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (r *Store) UniversalClient() goredis.UniversalClient { | ||||
| 	return r.cli | ||||
| } | ||||
|  | ||||
| func (r *Store) ClusterClient() *goredis.ClusterClient { | ||||
| 	if c, ok := r.cli.(*goredis.ClusterClient); ok { | ||||
| 		return c | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (r *Store) Disconnect(ctx context.Context) error { | ||||
| 	return r.cli.Close() | ||||
| 	var err error | ||||
| 	select { | ||||
| 	case <-r.done: | ||||
| 		return err | ||||
| 	default: | ||||
| 		if r.cli != nil { | ||||
| 			if err = r.cli.Close(); err != nil { | ||||
| 				r.isConnected.Store(0) | ||||
| 			} | ||||
| 		} | ||||
| 		close(r.done) | ||||
| 		return err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	b := r.pool.Get() | ||||
| 	defer r.pool.Put(b) | ||||
| 	options := store.NewExistsOptions(opts...) | ||||
|  | ||||
| 	timeout := r.opts.Timeout | ||||
| @@ -97,25 +140,23 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	rkey := r.getKey(r.opts.Namespace, options.Namespace, key) | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, "cache exists "+rkey) | ||||
| 	defer sp.Finish() | ||||
| 	rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	val, err := r.cli.Exists(ctx, rkey).Result() | ||||
| 	setSpanError(ctx, err) | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil || (err == nil && val == 0) { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil || (err == nil && val == 0) { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @@ -123,6 +164,13 @@ func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOpti | ||||
| } | ||||
|  | ||||
| func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	b := r.pool.Get() | ||||
| 	defer r.pool.Put(b) | ||||
|  | ||||
| 	options := store.NewReadOptions(opts...) | ||||
|  | ||||
| 	timeout := r.opts.Timeout | ||||
| @@ -136,25 +184,23 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	rkey := r.getKey(r.opts.Namespace, options.Namespace, key) | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, "cache read "+rkey) | ||||
| 	defer sp.Finish() | ||||
| 	rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	buf, err := r.cli.Get(ctx, rkey).Bytes() | ||||
| 	setSpanError(ctx, err) | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil || (err == nil && buf == nil) { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil || (err == nil && buf == nil) { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @@ -165,7 +211,7 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s | ||||
| 		*b = string(buf) | ||||
| 	default: | ||||
| 		if err = r.opts.Codec.Unmarshal(buf, val); err != nil { | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 			setSpanError(ctx, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -173,6 +219,10 @@ func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...s | ||||
| } | ||||
|  | ||||
| func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts ...store.ReadOption) error { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	options := store.NewReadOptions(opts...) | ||||
|  | ||||
| 	timeout := r.opts.Timeout | ||||
| @@ -186,30 +236,42 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	var rkeys []string | ||||
| 	var pools []*strings.Builder | ||||
| 	if r.opts.Namespace != "" || options.Namespace != "" { | ||||
| 		rkeys = make([]string, len(keys)) | ||||
| 		pools = make([]*strings.Builder, len(keys)) | ||||
| 		for idx, key := range keys { | ||||
| 			keys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key) | ||||
| 			b := r.pool.Get() | ||||
| 			pools[idx] = b | ||||
| 			rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mread %v", keys)) | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	rvals, err := r.cli.MGet(ctx, keys...).Result() | ||||
| 	var rvals []interface{} | ||||
| 	var err error | ||||
| 	if r.opts.Namespace != "" || options.Namespace != "" { | ||||
| 		rvals, err = r.cli.MGet(ctx, rkeys...).Result() | ||||
| 		for idx := range pools { | ||||
| 			r.pool.Put(pools[idx]) | ||||
| 		} | ||||
| 	} else { | ||||
| 		rvals, err = r.cli.MGet(ctx, keys...).Result() | ||||
| 	} | ||||
| 	setSpanError(ctx, err) | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil || (len(rvals) == 0) { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil || (len(rvals) == 0) { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @@ -250,7 +312,7 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts | ||||
|  | ||||
| 		itm.Set(reflect.New(vt.Elem())) | ||||
| 		if err = r.opts.Codec.Unmarshal(buf, itm.Interface()); err != nil { | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 			setSpanError(ctx, err) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| @@ -260,6 +322,10 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts | ||||
| } | ||||
|  | ||||
| func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.DeleteOption) error { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	options := store.NewDeleteOptions(opts...) | ||||
|  | ||||
| 	timeout := r.opts.Timeout | ||||
| @@ -273,30 +339,41 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	var rkeys []string | ||||
| 	var pools []*strings.Builder | ||||
| 	if r.opts.Namespace != "" || options.Namespace != "" { | ||||
| 		rkeys = make([]string, len(keys)) | ||||
| 		pools = make([]*strings.Builder, len(keys)) | ||||
| 		for idx, key := range keys { | ||||
| 			keys[idx] = r.getKey(r.opts.Namespace, options.Namespace, key) | ||||
| 			b := r.pool.Get() | ||||
| 			pools[idx] = b | ||||
| 			rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mdelete %v", keys)) | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	err := r.cli.Del(ctx, keys...).Err() | ||||
| 	var err error | ||||
| 	if r.opts.Namespace != "" || options.Namespace != "" { | ||||
| 		err = r.cli.Del(ctx, rkeys...).Err() | ||||
| 		for idx := range pools { | ||||
| 			r.pool.Put(pools[idx]) | ||||
| 		} | ||||
| 	} else { | ||||
| 		err = r.cli.Del(ctx, keys...).Err() | ||||
| 	} | ||||
| 	setSpanError(ctx, err) | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @@ -304,6 +381,13 @@ func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.Delete | ||||
| } | ||||
|  | ||||
| func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	b := r.pool.Get() | ||||
| 	defer r.pool.Put(b) | ||||
|  | ||||
| 	options := store.NewDeleteOptions(opts...) | ||||
|  | ||||
| 	timeout := r.opts.Timeout | ||||
| @@ -317,24 +401,21 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache delete %v", key)) | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	err := r.cli.Del(ctx, r.getKey(r.opts.Namespace, options.Namespace, key)).Err() | ||||
| 	err := r.cli.Del(ctx, r.getKey(b, r.opts.Namespace, options.Namespace, key)).Err() | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	setSpanError(ctx, err) | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @@ -342,6 +423,10 @@ func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOpti | ||||
| } | ||||
|  | ||||
| func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, opts ...store.WriteOption) error { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	options := store.NewWriteOptions(opts...) | ||||
|  | ||||
| 	timeout := r.opts.Timeout | ||||
| @@ -355,13 +440,12 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache mwrite %v", keys)) | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| 	kvs := make([]string, 0, len(keys)*2) | ||||
|  | ||||
| 	pools := make([]*strings.Builder, len(keys)) | ||||
| 	for idx, key := range keys { | ||||
| 		kvs = append(kvs, r.getKey(r.opts.Namespace, options.Namespace, key)) | ||||
| 		b := r.pool.Get() | ||||
| 		pools[idx] = b | ||||
| 		kvs = append(kvs, r.getKey(b, r.opts.Namespace, options.Namespace, key)) | ||||
|  | ||||
| 		switch vt := vals[idx].(type) { | ||||
| 		case string: | ||||
| @@ -371,48 +455,53 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o | ||||
| 		default: | ||||
| 			buf, err := r.opts.Codec.Marshal(vt) | ||||
| 			if err != nil { | ||||
| 				sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 				return err | ||||
| 			} | ||||
| 			kvs = append(kvs, string(buf)) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
|  | ||||
| 	cmds, err := r.cli.Pipelined(ctx, func(pipe redis.Pipeliner) error { | ||||
| 	pipeliner := func(pipe goredis.Pipeliner) error { | ||||
| 		for idx := 0; idx < len(kvs); idx += 2 { | ||||
| 			if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil { | ||||
| 				setSpanError(ctx, err) | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
| 	} | ||||
|  | ||||
| 	ts := time.Now() | ||||
| 	cmds, err := r.cli.Pipelined(ctx, pipeliner) | ||||
| 	for idx := range pools { | ||||
| 		r.pool.Put(pools[idx]) | ||||
| 	} | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	setSpanError(ctx, err) | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	for _, cmd := range cmds { | ||||
| 		if err = cmd.Err(); err != nil { | ||||
| 			if err == redis.Nil { | ||||
| 				r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 			if err == goredis.Nil { | ||||
| 				r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 				return store.ErrNotFound | ||||
| 			} | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 			r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 			setSpanError(ctx, err) | ||||
| 			r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| @@ -421,6 +510,13 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o | ||||
| } | ||||
|  | ||||
| func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	b := r.pool.Get() | ||||
| 	defer r.pool.Put(b) | ||||
|  | ||||
| 	options := store.NewWriteOptions(opts...) | ||||
|  | ||||
| 	timeout := r.opts.Timeout | ||||
| @@ -434,9 +530,7 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	rkey := r.getKey(r.opts.Namespace, options.Namespace, key) | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache write %v", rkey)) | ||||
| 	defer sp.Finish() | ||||
| 	rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) | ||||
|  | ||||
| 	var buf []byte | ||||
| 	switch vt := val.(type) { | ||||
| @@ -448,26 +542,26 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... | ||||
| 		var err error | ||||
| 		buf, err = r.opts.Codec.Marshal(val) | ||||
| 		if err != nil { | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	err := r.cli.Set(ctx, rkey, buf, options.TTL).Err() | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	setSpanError(ctx, err) | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @@ -475,12 +569,19 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... | ||||
| } | ||||
|  | ||||
| func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) { | ||||
| 	if err := r.connect(ctx); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	b := r.pool.Get() | ||||
| 	defer r.pool.Put(b) | ||||
|  | ||||
| 	options := store.NewListOptions(opts...) | ||||
| 	if len(options.Namespace) == 0 { | ||||
| 		options.Namespace = r.opts.Namespace | ||||
| 	} | ||||
|  | ||||
| 	rkey := r.getKey(options.Namespace, "", options.Prefix+"*") | ||||
| 	rkey := r.getKey(b, options.Namespace, "", options.Prefix+"*") | ||||
| 	if options.Suffix != "" { | ||||
| 		rkey += options.Suffix | ||||
| 	} | ||||
| @@ -496,25 +597,37 @@ func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, e | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	ctx, sp := r.opts.Tracer.Start(ctx, fmt.Sprintf("cache list %v", rkey)) | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| 	// TODO: add support for prefix/suffix/limit | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Inc() | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() | ||||
| 	ts := time.Now() | ||||
| 	keys, err := r.cli.Keys(ctx, rkey).Result() | ||||
| 	var keys []string | ||||
| 	var err error | ||||
|  | ||||
| 	if c, ok := r.cli.(*goredis.ClusterClient); ok { | ||||
| 		err = c.ForEachMaster(ctx, func(nctx context.Context, cli *goredis.Client) error { | ||||
| 			nkeys, nerr := cli.Keys(nctx, rkey).Result() | ||||
| 			if nerr != nil { | ||||
| 				return nerr | ||||
| 			} | ||||
| 			keys = append(keys, nkeys...) | ||||
| 			return nil | ||||
| 		}) | ||||
| 	} else { | ||||
| 		keys, err = r.cli.Keys(ctx, rkey).Result() | ||||
| 	} | ||||
| 	te := time.Since(ts) | ||||
| 	r.opts.Meter.Counter(semconv.CacheRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.CacheRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.CacheRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == redis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 	setSpanError(ctx, err) | ||||
|  | ||||
| 	r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() | ||||
| 	r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) | ||||
| 	if err == goredis.Nil { | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() | ||||
| 		return nil, store.ErrNotFound | ||||
| 	} else if err == nil { | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() | ||||
| 	} else if err != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		r.opts.Meter.Counter(semconv.CacheRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| @@ -546,75 +659,106 @@ func (r *Store) String() string { | ||||
| } | ||||
|  | ||||
| func NewStore(opts ...store.Option) *Store { | ||||
| 	return &Store{opts: store.NewOptions(opts...)} | ||||
| 	return &Store{done: make(chan struct{}), opts: store.NewOptions(opts...)} | ||||
| } | ||||
|  | ||||
| func (r *Store) configure() error { | ||||
| 	var redisOptions *redis.Options | ||||
| 	var redisClusterOptions *redis.ClusterOptions | ||||
| 	var err error | ||||
|  | ||||
| 	nodes := r.opts.Addrs | ||||
|  | ||||
| 	if len(nodes) == 0 { | ||||
| 		nodes = []string{"redis://127.0.0.1:6379"} | ||||
| 	} | ||||
|  | ||||
| 	if r.cli != nil && r.opts.Context == nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	universalOptions := DefaultUniversalOptions | ||||
|  | ||||
| 	if r.opts.Context != nil { | ||||
| 		if c, ok := r.opts.Context.Value(configKey{}).(*redis.Options); ok { | ||||
| 			redisOptions = c | ||||
| 		if o, ok := r.opts.Context.Value(configKey{}).(*goredis.Options); ok { | ||||
| 			universalOptions.Addrs = []string{o.Addr} | ||||
| 			universalOptions.Dialer = o.Dialer | ||||
| 			universalOptions.OnConnect = o.OnConnect | ||||
| 			universalOptions.Username = o.Username | ||||
| 			universalOptions.Password = o.Password | ||||
|  | ||||
| 			universalOptions.MaxRetries = o.MaxRetries | ||||
| 			universalOptions.MinRetryBackoff = o.MinRetryBackoff | ||||
| 			universalOptions.MaxRetryBackoff = o.MaxRetryBackoff | ||||
|  | ||||
| 			universalOptions.DialTimeout = o.DialTimeout | ||||
| 			universalOptions.ReadTimeout = o.ReadTimeout | ||||
| 			universalOptions.WriteTimeout = o.WriteTimeout | ||||
| 			universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled | ||||
|  | ||||
| 			universalOptions.PoolFIFO = o.PoolFIFO | ||||
|  | ||||
| 			universalOptions.PoolSize = o.PoolSize | ||||
| 			universalOptions.PoolTimeout = o.PoolTimeout | ||||
| 			universalOptions.MinIdleConns = o.MinIdleConns | ||||
| 			universalOptions.MaxIdleConns = o.MaxIdleConns | ||||
| 			universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime | ||||
| 			universalOptions.ConnMaxLifetime = o.ConnMaxLifetime | ||||
|  | ||||
| 			if r.opts.TLSConfig != nil { | ||||
| 				redisOptions.TLSConfig = r.opts.TLSConfig | ||||
| 				universalOptions.TLSConfig = r.opts.TLSConfig | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if c, ok := r.opts.Context.Value(clusterConfigKey{}).(*redis.ClusterOptions); ok { | ||||
| 			redisClusterOptions = c | ||||
| 		if o, ok := r.opts.Context.Value(clusterConfigKey{}).(*goredis.ClusterOptions); ok { | ||||
| 			universalOptions.Addrs = o.Addrs | ||||
| 			universalOptions.Dialer = o.Dialer | ||||
| 			universalOptions.OnConnect = o.OnConnect | ||||
| 			universalOptions.Username = o.Username | ||||
| 			universalOptions.Password = o.Password | ||||
|  | ||||
| 			universalOptions.MaxRedirects = o.MaxRedirects | ||||
| 			universalOptions.ReadOnly = o.ReadOnly | ||||
| 			universalOptions.RouteByLatency = o.RouteByLatency | ||||
| 			universalOptions.RouteRandomly = o.RouteRandomly | ||||
|  | ||||
| 			universalOptions.MaxRetries = o.MaxRetries | ||||
| 			universalOptions.MinRetryBackoff = o.MinRetryBackoff | ||||
| 			universalOptions.MaxRetryBackoff = o.MaxRetryBackoff | ||||
|  | ||||
| 			universalOptions.DialTimeout = o.DialTimeout | ||||
| 			universalOptions.ReadTimeout = o.ReadTimeout | ||||
| 			universalOptions.WriteTimeout = o.WriteTimeout | ||||
| 			universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled | ||||
|  | ||||
| 			universalOptions.PoolFIFO = o.PoolFIFO | ||||
|  | ||||
| 			universalOptions.PoolSize = o.PoolSize | ||||
| 			universalOptions.PoolTimeout = o.PoolTimeout | ||||
| 			universalOptions.MinIdleConns = o.MinIdleConns | ||||
| 			universalOptions.MaxIdleConns = o.MaxIdleConns | ||||
| 			universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime | ||||
| 			universalOptions.ConnMaxLifetime = o.ConnMaxLifetime | ||||
| 			if r.opts.TLSConfig != nil { | ||||
| 				redisClusterOptions.TLSConfig = r.opts.TLSConfig | ||||
| 				universalOptions.TLSConfig = r.opts.TLSConfig | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if o, ok := r.opts.Context.Value(universalConfigKey{}).(*goredis.UniversalOptions); ok { | ||||
| 			universalOptions = o | ||||
| 			if r.opts.TLSConfig != nil { | ||||
| 				universalOptions.TLSConfig = r.opts.TLSConfig | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if redisOptions != nil && redisClusterOptions != nil { | ||||
| 		return fmt.Errorf("must specify only one option Config or ClusterConfig") | ||||
| 	if len(r.opts.Addrs) > 0 { | ||||
| 		universalOptions.Addrs = r.opts.Addrs | ||||
| 	} else { | ||||
| 		universalOptions.Addrs = []string{"127.0.0.1:6379"} | ||||
| 	} | ||||
|  | ||||
| 	if redisOptions == nil && redisClusterOptions == nil && r.cli != nil { | ||||
| 		return nil | ||||
| 	} | ||||
| 	r.cli = goredis.NewUniversalClient(universalOptions) | ||||
| 	setTracing(r.cli, r.opts.Tracer) | ||||
|  | ||||
| 	if redisOptions == nil && redisClusterOptions == nil && len(nodes) == 1 { | ||||
| 		redisOptions, err = redis.ParseURL(nodes[0]) | ||||
| 		if err != nil { | ||||
| 			redisOptions = DefaultOptions | ||||
| 			redisOptions.Addr = r.opts.Addrs[0] | ||||
| 			redisOptions.TLSConfig = r.opts.TLSConfig | ||||
| 		} | ||||
| 	} else if redisOptions == nil && redisClusterOptions == nil && len(nodes) > 1 { | ||||
| 		redisClusterOptions = DefaultClusterOptions | ||||
| 		redisClusterOptions.Addrs = r.opts.Addrs | ||||
| 		redisClusterOptions.TLSConfig = r.opts.TLSConfig | ||||
| 	} | ||||
| 	r.pool = pool.NewStringsPool(50) | ||||
|  | ||||
| 	if redisOptions != nil { | ||||
| 		r.cli = redis.NewClient(redisOptions) | ||||
| 	} else if redisClusterOptions != nil { | ||||
| 		r.cli = redis.NewClusterClient(redisClusterOptions) | ||||
| 	} | ||||
| 	r.statsMeter() | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (r *Store) getKey(mainNamespace string, opNamespace string, key string) string { | ||||
| 	b := r.pool.Get() | ||||
| 	defer r.pool.Put(b) | ||||
| 	b.Reset() | ||||
|  | ||||
| func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace string, key string) string { | ||||
| 	if opNamespace == "" { | ||||
| 		opNamespace = mainNamespace | ||||
| 	} | ||||
| @@ -625,3 +769,14 @@ func (r *Store) getKey(mainNamespace string, opNamespace string, key string) str | ||||
| 	b.WriteString(key) | ||||
| 	return b.String() | ||||
| } | ||||
|  | ||||
| func (r *Store) connect(ctx context.Context) (err error) { | ||||
| 	if r.isConnected.Load() == 0 { | ||||
| 		if err = r.cli.Ping(ctx).Err(); err != nil { | ||||
| 			setSpanError(ctx, err) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	r.isConnected.Store(1) | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -7,14 +7,42 @@ import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/redis/go-redis/v9" | ||||
| 	goredis "github.com/redis/go-redis/v9" | ||||
| 	"go.unistack.org/micro/v3/store" | ||||
| 	"go.unistack.org/micro/v3/tracer" | ||||
| ) | ||||
|  | ||||
| func TestKeepTTL(t *testing.T) { | ||||
| 	ctx := context.Background() | ||||
|  | ||||
| 	if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { | ||||
| 		t.Skip() | ||||
| 	} | ||||
| 	r := NewStore(store.Addrs(os.Getenv("STORE_NODES"))) | ||||
|  | ||||
| 	if err := r.Init(); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if err := r.Connect(ctx); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	key := "key" | ||||
| 	err := r.Write(ctx, key, "val1", store.WriteTTL(15*time.Second)) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Write error: %v", err) | ||||
| 	} | ||||
| 	time.Sleep(3 * time.Second) | ||||
| 	err = r.Write(ctx, key, "val2", store.WriteTTL(-1)) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Write error: %v", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func Test_rkv_configure(t *testing.T) { | ||||
| 	type fields struct { | ||||
| 		options store.Options | ||||
| 		Client  *redis.Client | ||||
| 		Client  goredis.UniversalClient | ||||
| 	} | ||||
| 	type wantValues struct { | ||||
| 		username string | ||||
| @@ -37,7 +65,7 @@ func Test_rkv_configure(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "legacy Url", fields: fields{options: store.Options{Addrs: []string{"127.0.0.1:6379"}}, Client: nil}, | ||||
| 			name: "legacy Url", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"127.0.0.1:6379"}}, Client: nil}, | ||||
| 			wantErr: false, want: wantValues{ | ||||
| 				username: "", | ||||
| 				password: "", | ||||
| @@ -45,7 +73,7 @@ func Test_rkv_configure(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "New Url", fields: fields{options: store.Options{Addrs: []string{"redis://127.0.0.1:6379"}}, Client: nil}, | ||||
| 			name: "New Url", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"redis://127.0.0.1:6379"}}, Client: nil}, | ||||
| 			wantErr: false, want: wantValues{ | ||||
| 				username: "", | ||||
| 				password: "", | ||||
| @@ -53,7 +81,7 @@ func Test_rkv_configure(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "Url with Pwd", fields: fields{options: store.Options{Addrs: []string{"redis://:password@redis:6379"}}, Client: nil}, | ||||
| 			name: "Url with Pwd", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"redis://:password@redis:6379"}}, Client: nil}, | ||||
| 			wantErr: false, want: wantValues{ | ||||
| 				username: "", | ||||
| 				password: "password", | ||||
| @@ -61,7 +89,7 @@ func Test_rkv_configure(t *testing.T) { | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "Url with username and Pwd", fields: fields{options: store.Options{Addrs: []string{"redis://username:password@redis:6379"}}, Client: nil}, | ||||
| 			name: "Url with username and Pwd", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"redis://username:password@redis:6379"}}, Client: nil}, | ||||
| 			wantErr: false, want: wantValues{ | ||||
| 				username: "username", | ||||
| 				password: "password", | ||||
|   | ||||
							
								
								
									
										49
									
								
								stats.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								stats.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,49 @@ | ||||
| package redis | ||||
|  | ||||
| import ( | ||||
| 	"time" | ||||
|  | ||||
| 	goredis "github.com/redis/go-redis/v9" | ||||
| 	"go.unistack.org/micro/v3/meter" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	PoolHitsTotal        = "pool_hits_total" | ||||
| 	PoolMissesTotal      = "pool_misses_total" | ||||
| 	PoolTimeoutTotal     = "pool_timeout_total" | ||||
| 	PoolConnTotalCurrent = "pool_conn_total_current" | ||||
| 	PoolConnIdleCurrent  = "pool_conn_idle_current" | ||||
| 	PoolConnStaleTotal   = "pool_conn_stale_total" | ||||
| ) | ||||
|  | ||||
| type Statser interface { | ||||
| 	PoolStats() *goredis.PoolStats | ||||
| } | ||||
|  | ||||
| func (r *Store) statsMeter() { | ||||
| 	var st Statser | ||||
|  | ||||
| 	if r.cli != nil { | ||||
| 		st = r.cli | ||||
| 	} else { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	go func() { | ||||
| 		ticker := time.NewTicker(meter.DefaultMeterStatsInterval) | ||||
| 		defer ticker.Stop() | ||||
|  | ||||
| 		for _ = range ticker.C { | ||||
| 			if st == nil { | ||||
| 				return | ||||
| 			} | ||||
| 			stats := st.PoolStats() | ||||
| 			r.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits)) | ||||
| 			r.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses)) | ||||
| 			r.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts)) | ||||
| 			r.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns)) | ||||
| 			r.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns)) | ||||
| 			r.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns)) | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
							
								
								
									
										128
									
								
								tracer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										128
									
								
								tracer.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,128 @@ | ||||
| package redis | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| 	"strconv" | ||||
|  | ||||
| 	rediscmd "github.com/redis/go-redis/extra/rediscmd/v9" | ||||
| 	goredis "github.com/redis/go-redis/v9" | ||||
| 	"go.unistack.org/micro/v3/tracer" | ||||
| ) | ||||
|  | ||||
| func setTracing(rdb goredis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) { | ||||
| 	switch rdb := rdb.(type) { | ||||
| 	case *goredis.Client: | ||||
| 		opt := rdb.Options() | ||||
| 		connString := formatDBConnString(opt.Network, opt.Addr) | ||||
| 		rdb.AddHook(newTracingHook(connString, tr)) | ||||
| 	case *goredis.ClusterClient: | ||||
| 		rdb.OnNewNode(func(rdb *goredis.Client) { | ||||
| 			opt := rdb.Options() | ||||
| 			connString := formatDBConnString(opt.Network, opt.Addr) | ||||
| 			rdb.AddHook(newTracingHook(connString, tr)) | ||||
| 		}) | ||||
| 	case *goredis.Ring: | ||||
| 		rdb.OnNewNode(func(rdb *goredis.Client) { | ||||
| 			opt := rdb.Options() | ||||
| 			connString := formatDBConnString(opt.Network, opt.Addr) | ||||
| 			rdb.AddHook(newTracingHook(connString, tr)) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type tracingHook struct { | ||||
| 	tr   tracer.Tracer | ||||
| 	opts []tracer.SpanOption | ||||
| } | ||||
|  | ||||
| var _ goredis.Hook = (*tracingHook)(nil) | ||||
|  | ||||
| func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook { | ||||
| 	opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient)) | ||||
| 	if connString != "" { | ||||
| 		opts = append(opts, tracer.WithSpanLabels("db.connection_string", connString)) | ||||
| 	} | ||||
|  | ||||
| 	return &tracingHook{ | ||||
| 		tr:   tr, | ||||
| 		opts: opts, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (h *tracingHook) DialHook(hook goredis.DialHook) goredis.DialHook { | ||||
| 	return func(ctx context.Context, network, addr string) (net.Conn, error) { | ||||
| 		/* | ||||
| 			_, span := h.tr.Start(ctx, "goredis.dial", h.opts...) | ||||
| 			defer span.Finish() | ||||
| 		*/ | ||||
| 		conn, err := hook(ctx, network, addr) | ||||
| 		// recordError(span, err) | ||||
|  | ||||
| 		return conn, err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (h *tracingHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { | ||||
| 	return func(ctx context.Context, cmd goredis.Cmder) error { | ||||
| 		cmdString := rediscmd.CmdString(cmd) | ||||
| 		var err error | ||||
|  | ||||
| 		switch cmdString { | ||||
| 		case "cluster slots": | ||||
| 			break | ||||
| 		default: | ||||
| 			_, span := h.tr.Start(ctx, "goredis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) | ||||
| 			defer func() { | ||||
| 				recordError(span, err) | ||||
| 				span.Finish() | ||||
| 			}() | ||||
| 		} | ||||
|  | ||||
| 		err = hook(ctx, cmd) | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook { | ||||
| 	return func(ctx context.Context, cmds []goredis.Cmder) error { | ||||
| 		_, cmdsString := rediscmd.CmdsString(cmds) | ||||
|  | ||||
| 		opts := append(h.opts, tracer.WithSpanLabels( | ||||
| 			"db.goredis.num_cmd", strconv.Itoa(len(cmds)), | ||||
| 			"db.statement", cmdsString, | ||||
| 		)) | ||||
|  | ||||
| 		_, span := h.tr.Start(ctx, "goredis.process_pipeline", opts...) | ||||
| 		defer span.Finish() | ||||
|  | ||||
| 		err := hook(ctx, cmds) | ||||
| 		recordError(span, err) | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func setSpanError(ctx context.Context, err error) { | ||||
| 	if err == nil || err == goredis.Nil { | ||||
| 		return | ||||
| 	} | ||||
| 	if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil { | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func recordError(span tracer.Span, err error) { | ||||
| 	if err != nil && err != goredis.Nil { | ||||
| 		span.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func formatDBConnString(network, addr string) string { | ||||
| 	if network == "tcp" { | ||||
| 		network = "redis" | ||||
| 	} | ||||
| 	return fmt.Sprintf("%s://%s", network, addr) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user