Compare commits
3 Commits
Author | SHA1 | Date | |
---|---|---|---|
c8d21b4ef4 | |||
195c604a31 | |||
b263b69031 |
12
options.go
12
options.go
@ -5,14 +5,8 @@ import (
|
|||||||
"go.unistack.org/micro/v4/options"
|
"go.unistack.org/micro/v4/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
type configKey struct{}
|
type universalConfigKey struct{}
|
||||||
|
|
||||||
func Config(c *redis.Options) options.Option {
|
func UniversalConfig(c *redis.UniversalOptions) options.Option {
|
||||||
return options.ContextOption(configKey{}, c)
|
return options.ContextOption(universalConfigKey{}, c)
|
||||||
}
|
|
||||||
|
|
||||||
type clusterConfigKey struct{}
|
|
||||||
|
|
||||||
func ClusterConfig(c *redis.ClusterOptions) options.Option {
|
|
||||||
return options.ContextOption(clusterConfigKey{}, c)
|
|
||||||
}
|
}
|
||||||
|
82
redis.go
82
redis.go
@ -14,21 +14,7 @@ import (
|
|||||||
|
|
||||||
type Store struct {
|
type Store struct {
|
||||||
opts store.Options
|
opts store.Options
|
||||||
cli redisClient
|
cli redis.UniversalClient
|
||||||
}
|
|
||||||
|
|
||||||
type redisClient interface {
|
|
||||||
Get(ctx context.Context, key string) *redis.StringCmd
|
|
||||||
Del(ctx context.Context, keys ...string) *redis.IntCmd
|
|
||||||
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
|
|
||||||
Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
|
|
||||||
MGet(ctx context.Context, keys ...string) *redis.SliceCmd
|
|
||||||
MSet(ctx context.Context, kv ...interface{}) *redis.StatusCmd
|
|
||||||
Exists(ctx context.Context, keys ...string) *redis.IntCmd
|
|
||||||
Ping(ctx context.Context) *redis.StatusCmd
|
|
||||||
Pipeline() redis.Pipeliner
|
|
||||||
Pipelined(ctx context.Context, fn func(redis.Pipeliner) error) ([]redis.Cmder, error)
|
|
||||||
Close() error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Store) Connect(ctx context.Context) error {
|
func (r *Store) Connect(ctx context.Context) error {
|
||||||
@ -43,6 +29,10 @@ func (r *Store) Init(opts ...options.Option) error {
|
|||||||
return r.configure()
|
return r.configure()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Store) Redis() redis.UniversalClient {
|
||||||
|
return r.cli
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Store) Disconnect(ctx context.Context) error {
|
func (r *Store) Disconnect(ctx context.Context) error {
|
||||||
return r.cli.Close()
|
return r.cli.Close()
|
||||||
}
|
}
|
||||||
@ -346,50 +336,27 @@ func NewStore(opts ...options.Option) *Store {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Store) configure() error {
|
func (r *Store) configure() error {
|
||||||
var redisOptions *redis.Options
|
var redisUniversalOptions *redis.UniversalOptions
|
||||||
var redisClusterOptions *redis.ClusterOptions
|
|
||||||
var err error
|
|
||||||
|
|
||||||
nodes := r.opts.Address
|
|
||||||
|
|
||||||
if len(nodes) == 0 {
|
|
||||||
nodes = []string{"redis://127.0.0.1:6379"}
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.cli != nil && r.opts.Context == nil {
|
if r.cli != nil && r.opts.Context == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.opts.Context != nil {
|
if r.opts.Context != nil {
|
||||||
if c, ok := r.opts.Context.Value(configKey{}).(*redis.Options); ok {
|
if c, ok := r.opts.Context.Value(universalConfigKey{}).(*redis.UniversalOptions); ok {
|
||||||
redisOptions = c
|
redisUniversalOptions = c
|
||||||
if r.opts.TLSConfig != nil {
|
if r.opts.TLSConfig != nil {
|
||||||
redisOptions.TLSConfig = r.opts.TLSConfig
|
redisUniversalOptions.TLSConfig = r.opts.TLSConfig
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if c, ok := r.opts.Context.Value(clusterConfigKey{}).(*redis.ClusterOptions); ok {
|
|
||||||
redisClusterOptions = c
|
|
||||||
if r.opts.TLSConfig != nil {
|
|
||||||
redisClusterOptions.TLSConfig = r.opts.TLSConfig
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if redisOptions != nil && redisClusterOptions != nil {
|
if redisUniversalOptions == nil && r.cli != nil {
|
||||||
return fmt.Errorf("must specify only one option Config or ClusterConfig")
|
|
||||||
}
|
|
||||||
|
|
||||||
if redisOptions == nil && redisClusterOptions == nil && r.cli != nil {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if redisOptions == nil && redisClusterOptions == nil && len(nodes) == 1 {
|
if redisUniversalOptions == nil {
|
||||||
redisOptions, err = redis.ParseURL(nodes[0])
|
redisUniversalOptions = &redis.UniversalOptions{
|
||||||
if err != nil {
|
|
||||||
// Backwards compatibility
|
|
||||||
redisOptions = &redis.Options{
|
|
||||||
Addr: nodes[0],
|
|
||||||
Username: "",
|
Username: "",
|
||||||
Password: "", // no password set
|
Password: "", // no password set
|
||||||
DB: 0, // use default DB
|
DB: 0, // use default DB
|
||||||
@ -403,27 +370,14 @@ func (r *Store) configure() error {
|
|||||||
TLSConfig: r.opts.TLSConfig,
|
TLSConfig: r.opts.TLSConfig,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if redisOptions == nil && redisClusterOptions == nil && len(nodes) > 1 {
|
|
||||||
redisClusterOptions = &redis.ClusterOptions{
|
if len(r.opts.Address) > 0 {
|
||||||
Addrs: nodes,
|
redisUniversalOptions.Addrs = r.opts.Address
|
||||||
Username: "",
|
} else if len(redisUniversalOptions.Addrs) == 0 {
|
||||||
Password: "", // no password set
|
redisUniversalOptions.Addrs = []string{"redis://127.0.0.1:6379"}
|
||||||
MaxRetries: 2,
|
|
||||||
MaxRetryBackoff: 256 * time.Millisecond,
|
|
||||||
DialTimeout: 1 * time.Second,
|
|
||||||
ReadTimeout: 1 * time.Second,
|
|
||||||
WriteTimeout: 1 * time.Second,
|
|
||||||
PoolTimeout: 1 * time.Second,
|
|
||||||
MinIdleConns: 10,
|
|
||||||
TLSConfig: r.opts.TLSConfig,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if redisOptions != nil {
|
r.cli = redis.NewUniversalClient(redisUniversalOptions)
|
||||||
r.cli = redis.NewClient(redisOptions)
|
|
||||||
} else if redisClusterOptions != nil {
|
|
||||||
r.cli = redis.NewClusterClient(redisClusterOptions)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -72,11 +72,11 @@ func Test_rkv_configure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
r := &Store{
|
rc := &Store{
|
||||||
opts: tt.fields.options,
|
opts: tt.fields.options,
|
||||||
cli: tt.fields.Client,
|
cli: tt.fields.Client,
|
||||||
}
|
}
|
||||||
err := r.configure()
|
err := rc.configure()
|
||||||
if (err != nil) != tt.wantErr {
|
if (err != nil) != tt.wantErr {
|
||||||
t.Errorf("configure() error = %v, wantErr %v", err, tt.wantErr)
|
t.Errorf("configure() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user