diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..723ef36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea \ No newline at end of file diff --git a/event.go b/event.go new file mode 100644 index 0000000..2a2d378 --- /dev/null +++ b/event.go @@ -0,0 +1,86 @@ +package redis + +import ( + "context" + "errors" + "net" + "time" + + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v4/store" +) + +type eventHook struct { + s *Store +} + +var _ goredis.Hook = (*eventHook)(nil) + +func newEventHook(s *Store) *eventHook { + return &eventHook{s: s} +} + +func (h *eventHook) DialHook(hook goredis.DialHook) goredis.DialHook { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + conn, err := hook(ctx, network, addr) + if err != nil { + if !isRedisError(err) { + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) + } + } else { + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } + } + return conn, err + } +} + +func (h *eventHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { + return func(ctx context.Context, cmd goredis.Cmder) error { + err := hook(ctx, cmd) + if err != nil { + if !isRedisError(err) { + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) + } + } else { + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } + } + return err + } +} + +func (h *eventHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook { + return func(ctx context.Context, cmds []goredis.Cmder) error { + err := hook(ctx, cmds) + if err != nil { + if !isRedisError(err) { + if h.s.connected.CompareAndSwap(1, 0) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeDisconnect}) + } + } else { + h.s.connected.Store(1) + } + } else { + if h.s.connected.CompareAndSwap(0, 1) { + h.s.sendEvent(&event{ts: time.Now(), err: err, t: store.EventTypeConnect}) + } + } + return err + } +} + +func isRedisError(err error) bool { + var rerr goredis.Error + return errors.As(err, &rerr) +} diff --git a/go.mod b/go.mod index 5cd0226..10a9fc7 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,8 @@ go 1.22.0 toolchain go1.24.2 require ( - github.com/redis/go-redis/v9 v9.7.3 + github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 + github.com/redis/go-redis/v9 v9.7.0 go.unistack.org/micro/v4 v4.1.6 ) @@ -13,11 +14,11 @@ require ( github.com/ash3in/uuidv8 v1.2.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect - github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/spf13/cast v1.7.1 // indirect go.unistack.org/micro-proto/v4 v4.1.0 // indirect - google.golang.org/protobuf v1.36.6 // indirect + google.golang.org/protobuf v1.36.3 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b58873d..3f0daa1 100644 --- a/go.sum +++ b/go.sum @@ -1,33 +1,43 @@ +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= -github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= -github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= -github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= -github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= -github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= +github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk= +github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0/go.mod h1:eTg/YQtGYAZD5r3DlGlJptJ45AHA+/G+2NPn30PKzik= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk= go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= -go.unistack.org/micro/v4 v4.0.7 h1:2lwtZlHcSwgkahhFbkI4x1lOS79lw8uLHtcEhlFF+AM= -go.unistack.org/micro/v4 v4.0.7/go.mod h1:bVEYTlPi0EsdgZZt311bIroDg9ict7ky3C87dSCCAGk= go.unistack.org/micro/v4 v4.1.6 h1:sYLpe1Vd8/lDwddtV0BLTvJ+i+fllXAS4fZngT1wKZ4= go.unistack.org/micro/v4 v4.1.6/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go index be1314c..3aed042 100644 --- a/options.go +++ b/options.go @@ -1,12 +1,67 @@ package redis import ( - "github.com/redis/go-redis/v9" + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v4/logger" + "go.unistack.org/micro/v4/meter" "go.unistack.org/micro/v4/store" + "go.unistack.org/micro/v4/tracer" ) +type configKey struct{} + +func Config(c *goredis.Options) store.Option { + return store.SetOption(configKey{}, c) +} + +type clusterConfigKey struct{} + +func ClusterConfig(c *goredis.ClusterOptions) store.Option { + return store.SetOption(clusterConfigKey{}, c) +} + type universalConfigKey struct{} -func UniversalConfig(c *redis.UniversalOptions) store.Option { +func UniversalConfig(c *goredis.UniversalOptions) store.Option { return store.SetOption(universalConfigKey{}, c) } + +var ( + labelHost = "redis_host" + labelName = "redis_name" +) + +// Options struct holds wrapper options +type Options struct { + Logger logger.Logger + Meter meter.Meter + Tracer tracer.Tracer + RedisHost string + RedisName string +} + +// Option func signature +type Option func(*Options) + +// NewOptions create new Options struct from provided option slice +func NewOptions(opts ...Option) Options { + options := Options{ + Logger: logger.DefaultLogger, + Meter: meter.DefaultMeter, + Tracer: tracer.DefaultTracer, + } + + for _, o := range opts { + o(&options) + } + + options.Meter = options.Meter.Clone( + meter.Labels( + labelHost, options.RedisHost, + labelName, options.RedisName), + ) + + options.Logger = options.Logger.Clone(logger.WithAddCallerSkipCount(1)) + + return options +} diff --git a/redis.go b/redis.go index 71df9db..d9b6dad 100755 --- a/redis.go +++ b/redis.go @@ -2,120 +2,289 @@ package redis import ( "context" - "fmt" + "errors" "reflect" "strings" + "sync" + "sync/atomic" "time" - "github.com/redis/go-redis/v9" - "go.unistack.org/micro/v4/options" + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v4/semconv" "go.unistack.org/micro/v4/store" + "go.unistack.org/micro/v4/util/id" + pool "go.unistack.org/micro/v4/util/xpool" +) + +var ( + _ store.Store = (*Store)(nil) + _ store.Event = (*event)(nil) + sendEventTime = 10 * time.Millisecond + DefaultUniversalOptions = &goredis.UniversalOptions{ + Username: "", + Password: "", // no password set + MaxRetries: 2, + MaxRetryBackoff: 256 * time.Millisecond, + DialTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + PoolTimeout: 1 * time.Second, + MinIdleConns: 10, + } + + DefaultClusterOptions = &goredis.ClusterOptions{ + Username: "", + Password: "", // no password set + MaxRetries: 2, + MaxRetryBackoff: 256 * time.Millisecond, + DialTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + PoolTimeout: 1 * time.Second, + MinIdleConns: 10, + } + + DefaultOptions = &goredis.Options{ + Username: "", + Password: "", // no password set + DB: 0, // use default DB + MaxRetries: 2, + MaxRetryBackoff: 256 * time.Millisecond, + DialTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + PoolTimeout: 1 * time.Second, + MinIdleConns: 10, + } ) type Store struct { - opts store.Options - cli redis.UniversalClient + cli goredis.UniversalClient + pool *pool.StringsPool + connected *atomic.Uint32 + opts store.Options + watchers map[string]*watcher + mu sync.RWMutex +} + +func (r *Store) Live() bool { + return r.connected.Load() == 1 +} + +func (r *Store) Ready() bool { + return r.connected.Load() == 1 +} + +func (r *Store) Health() bool { + return r.connected.Load() == 1 } func (r *Store) Connect(ctx context.Context) error { - return r.cli.Ping(ctx).Err() -} - -func (r *Store) Init(opts ...options.Option) error { - return r.configure(opts...) -} - -func (r *Store) Redis() redis.UniversalClient { - return r.cli -} - -func (r *Store) Disconnect(ctx context.Context) error { - return r.cli.Close() -} - -func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error { - if r.opts.Timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) - defer cancel() + if r.connected.Load() == 1 { + return nil } - options := store.NewExistsOptions(opts...) - if len(options.Namespace) == 0 { - options.Namespace = r.opts.Namespace + if r.cli == nil { + return store.ErrNotConnected } - if options.Namespace != "" { - key = fmt.Sprintf("%s%s", r.opts.Namespace, key) + if r.opts.LazyConnect { + return nil } - val, err := r.cli.Exists(ctx, key).Result() + if err := r.cli.Ping(ctx).Err(); err != nil { + setSpanError(ctx, err) + return err + } + r.connected.Store(1) + return nil +} + +func (r *Store) Init(opts ...store.Option) error { + err := r.configure(opts...) if err != nil { return err } - if val == 0 { - return store.ErrNotFound + + return nil +} + +func (r *Store) Client() *goredis.Client { + if c, ok := r.cli.(*goredis.Client); ok { + return c } return nil } -func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { - if r.opts.Timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) - defer cancel() +func (r *Store) UniversalClient() goredis.UniversalClient { + return r.cli +} + +func (r *Store) ClusterClient() *goredis.ClusterClient { + if c, ok := r.cli.(*goredis.ClusterClient); ok { + return c } - options := store.NewReadOptions(opts...) - if len(options.Namespace) == 0 { - options.Namespace = r.opts.Namespace - } - if options.Namespace != "" { - key = fmt.Sprintf("%s%s", options.Namespace, key) + return nil +} + +func (r *Store) Disconnect(ctx context.Context) error { + if r.connected.Load() == 0 { + return nil } - buf, err := r.cli.Get(ctx, key).Bytes() - if err != nil && err == redis.Nil { + if r.cli != nil { + if err := r.cli.Close(); err != nil { + return err + } + } + + r.connected.Store(1) + return nil +} + +func (r *Store) Exists(ctx context.Context, key string, opts ...store.ExistsOption) error { + b := r.pool.Get() + defer r.pool.Put(b) + options := store.NewExistsOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "exists") + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout + } + + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() + ts := time.Now() + val, err := r.cli.Exists(ctx, rkey).Result() + setSpanError(ctx, err) + te := time.Since(ts) + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) || (err == nil && val == 0) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() return store.ErrNotFound - } else if err != nil { + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() + } else { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } - if buf == nil { - return store.ErrNotFound + + return nil +} + +func (r *Store) Read(ctx context.Context, key string, val interface{}, opts ...store.ReadOption) error { + b := r.pool.Get() + defer r.pool.Put(b) + + options := store.NewReadOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "read") + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout } - return r.opts.Codec.Unmarshal(buf, val) + + if timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() + ts := time.Now() + buf, err := r.cli.Get(ctx, rkey).Bytes() + setSpanError(ctx, err) + te := time.Since(ts) + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) || (err == nil && buf == nil) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() + return store.ErrNotFound + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() + } else if err != nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() + return err + } + + switch b := val.(type) { + case *[]byte: + *b = buf + case *string: + *b = string(buf) + default: + if err = r.opts.Codec.Unmarshal(buf, val); err != nil { + setSpanError(ctx, err) + } + } + + return err } func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts ...store.ReadOption) error { - if len(keys) == 1 { - vt := reflect.ValueOf(vals) - if vt.Kind() == reflect.Ptr { - vt = reflect.Indirect(vt) - return r.Read(ctx, keys[0], vt.Index(0).Interface(), opts...) - } + options := store.NewReadOptions(opts...) + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout } - if r.opts.Timeout > 0 { + + if timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - options := store.NewReadOptions(opts...) - rkeys := make([]string, 0, len(keys)) - if len(options.Namespace) == 0 { - options.Namespace = r.opts.Namespace - } - for _, key := range keys { - if options.Namespace != "" { - rkeys = append(rkeys, fmt.Sprintf("%s%s", options.Namespace, key)) - } else { - rkeys = append(rkeys, key) + + var rkeys []string + var pools []*strings.Builder + if r.opts.Namespace != "" || options.Namespace != "" { + rkeys = make([]string, len(keys)) + pools = make([]*strings.Builder, len(keys)) + for idx, key := range keys { + b := r.pool.Get() + pools[idx] = b + rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key) } } - rvals, err := r.cli.MGet(ctx, rkeys...).Result() - if err != nil && err == redis.Nil { - return store.ErrNotFound - } else if err != nil { - return err + r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Inc() + ts := time.Now() + var rvals []interface{} + var err error + if r.opts.Namespace != "" || options.Namespace != "" { + rvals, err = r.cli.MGet(ctx, rkeys...).Result() + for idx := range pools { + r.pool.Put(pools[idx]) + } + } else { + rvals, err = r.cli.MGet(ctx, keys...).Result() } - if len(rvals) == 0 { + setSpanError(ctx, err) + te := time.Since(ts) + r.opts.Meter.Counter(semconv.StoreRequestInflight, "name", options.Name).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, "name", options.Name).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, "name", options.Name).Update(te.Seconds()) + if err == goredis.Nil || (len(rvals) == 0) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "miss").Inc() return store.ErrNotFound + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "hit").Inc() + } else if err != nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, "name", options.Name, "status", "failure").Inc() + return err } vv := reflect.ValueOf(vals) @@ -146,77 +315,144 @@ func (r *Store) MRead(ctx context.Context, keys []string, vals interface{}, opts // special case for raw data if vt.Kind() == reflect.Slice && vt.Elem().Kind() == reflect.Uint8 { itm.Set(reflect.MakeSlice(itm.Type(), len(buf), len(buf))) - } else { - itm.Set(reflect.New(vt.Elem())) + itm.SetBytes(buf) + continue + } else if vt.Kind() == reflect.String { + itm.SetString(string(buf)) + continue } + + itm.Set(reflect.New(vt.Elem())) if err = r.opts.Codec.Unmarshal(buf, itm.Interface()); err != nil { + setSpanError(ctx, err) return err } } vv.Set(nvv) + return nil } func (r *Store) MDelete(ctx context.Context, keys []string, opts ...store.DeleteOption) error { - if len(keys) == 1 { - return r.Delete(ctx, keys[0], opts...) + options := store.NewDeleteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "delete") + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout } - if r.opts.Timeout > 0 { + + if timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - options := store.NewDeleteOptions(opts...) - if len(options.Namespace) == 0 { - options.Namespace = r.opts.Namespace + + var rkeys []string + var pools []*strings.Builder + if r.opts.Namespace != "" || options.Namespace != "" { + rkeys = make([]string, len(keys)) + pools = make([]*strings.Builder, len(keys)) + for idx, key := range keys { + b := r.pool.Get() + pools[idx] = b + rkeys[idx] = r.getKey(b, r.opts.Namespace, options.Namespace, key) + } } - if options.Namespace == "" { - return r.cli.Del(ctx, keys...).Err() + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() + ts := time.Now() + var err error + if r.opts.Namespace != "" || options.Namespace != "" { + err = r.cli.Del(ctx, rkeys...).Err() + for idx := range pools { + r.pool.Put(pools[idx]) + } + } else { + err = r.cli.Del(ctx, keys...).Err() } - for idx := range keys { - keys[idx] = options.Namespace + keys[idx] + setSpanError(ctx, err) + te := time.Since(ts) + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if err == goredis.Nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() + return store.ErrNotFound + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() + } else if err != nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() + return err } - return r.cli.Del(ctx, keys...).Err() + + return nil } func (r *Store) Delete(ctx context.Context, key string, opts ...store.DeleteOption) error { - if r.opts.Timeout > 0 { + b := r.pool.Get() + defer r.pool.Put(b) + + options := store.NewDeleteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "delete") + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout + } + + if timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - options := store.NewDeleteOptions(opts...) - if len(options.Namespace) == 0 { - options.Namespace = r.opts.Namespace + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() + ts := time.Now() + err := r.cli.Del(ctx, r.getKey(b, r.opts.Namespace, options.Namespace, key)).Err() + te := time.Since(ts) + setSpanError(ctx, err) + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() + return store.ErrNotFound + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() + } else if err != nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() + return err } - if options.Namespace == "" { - return r.cli.Del(ctx, key).Err() - } - return r.cli.Del(ctx, fmt.Sprintf("%s%s", options.Namespace, key)).Err() + + return nil } func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, opts ...store.WriteOption) error { - if len(keys) == 1 { - return r.Write(ctx, keys[0], vals[0], opts...) + options := store.NewWriteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "write") + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout } - if r.opts.Timeout > 0 { + + if timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - options := store.NewWriteOptions(opts...) + kvs := make([]string, 0, len(keys)*2) - - if len(options.Namespace) == 0 { - options.Namespace = r.opts.Namespace - } - + pools := make([]*strings.Builder, len(keys)) for idx, key := range keys { - if options.Namespace != "" { - kvs = append(kvs, fmt.Sprintf("%s%s", options.Namespace, key)) - } else { - kvs = append(kvs, key) - } + b := r.pool.Get() + pools[idx] = b + kvs = append(kvs, r.getKey(b, r.opts.Namespace, options.Namespace, key)) switch vt := vals[idx].(type) { case string: @@ -232,21 +468,47 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } } - cmds, err := r.cli.Pipelined(ctx, func(pipe redis.Pipeliner) error { - var err error + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() + + pipeliner := func(pipe goredis.Pipeliner) error { for idx := 0; idx < len(kvs); idx += 2 { - if _, err = pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil { + if _, err := pipe.Set(ctx, kvs[idx], kvs[idx+1], options.TTL).Result(); err != nil { + setSpanError(ctx, err) return err } } return nil - }) - if err != nil { + } + + ts := time.Now() + cmds, err := r.cli.Pipelined(ctx, pipeliner) + for idx := range pools { + r.pool.Put(pools[idx]) + } + te := time.Since(ts) + setSpanError(ctx, err) + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if err == goredis.Nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() + return store.ErrNotFound + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() + } else if err != nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } for _, cmd := range cmds { if err = cmd.Err(); err != nil { + if err == goredis.Nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() + return store.ErrNotFound + } + setSpanError(ctx, err) + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return err } } @@ -255,16 +517,25 @@ func (r *Store) MWrite(ctx context.Context, keys []string, vals []interface{}, o } func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ...store.WriteOption) error { - if r.opts.Timeout > 0 { + b := r.pool.Get() + defer r.pool.Put(b) + + options := store.NewWriteOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "write") + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout + } + + if timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - options := store.NewWriteOptions(opts...) - if len(options.Namespace) == 0 { - options.Namespace = r.opts.Namespace - } + rkey := r.getKey(b, r.opts.Namespace, options.Namespace, key) var buf []byte switch vt := val.(type) { @@ -280,42 +551,103 @@ func (r *Store) Write(ctx context.Context, key string, val interface{}, opts ... } } - if options.Namespace != "" { - key = fmt.Sprintf("%s%s", options.Namespace, key) + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() + ts := time.Now() + err := r.cli.Set(ctx, rkey, buf, options.TTL).Err() + te := time.Since(ts) + setSpanError(ctx, err) + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if errors.Is(err, goredis.Nil) { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() + return store.ErrNotFound + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "hit")...).Inc() + } else { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() + return err } - return r.cli.Set(ctx, key, buf, options.TTL).Err() + return err } func (r *Store) List(ctx context.Context, opts ...store.ListOption) ([]string, error) { + b := r.pool.Get() + defer r.pool.Put(b) + options := store.NewListOptions(opts...) + labels := make([]string, 0, 6) + labels = append(labels, "name", options.Name, "statement", "list") + if len(options.Namespace) == 0 { options.Namespace = r.opts.Namespace } - rkey := fmt.Sprintf("%s%s*", options.Namespace, options.Prefix) + rkey := r.getKey(b, options.Namespace, "", options.Prefix+"*") if options.Suffix != "" { rkey += options.Suffix } - if r.opts.Timeout > 0 { + + timeout := r.opts.Timeout + if options.Timeout > 0 { + timeout = options.Timeout + } + + if timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.opts.Timeout) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } + // TODO: add support for prefix/suffix/limit - keys, err := r.cli.Keys(ctx, rkey).Result() - if err != nil { + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Inc() + ts := time.Now() + var keys []string + var err error + + if c, ok := r.cli.(*goredis.ClusterClient); ok { + err = c.ForEachMaster(ctx, func(nctx context.Context, cli *goredis.Client) error { + nkeys, nerr := cli.Keys(nctx, rkey).Result() + if nerr != nil { + return nerr + } + keys = append(keys, nkeys...) + return nil + }) + } else { + keys, err = r.cli.Keys(ctx, rkey).Result() + } + te := time.Since(ts) + setSpanError(ctx, err) + + r.opts.Meter.Counter(semconv.StoreRequestInflight, labels...).Dec() + r.opts.Meter.Summary(semconv.StoreRequestLatencyMicroseconds, labels...).Update(te.Seconds()) + r.opts.Meter.Histogram(semconv.StoreRequestDurationSeconds, labels...).Update(te.Seconds()) + if err == goredis.Nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "miss")...).Inc() + return nil, store.ErrNotFound + } else if err == nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "git")...).Inc() + } else if err != nil { + r.opts.Meter.Counter(semconv.StoreRequestTotal, append(labels, "status", "failure")...).Inc() return nil, err } - if options.Namespace == "" { + + prefix := r.opts.Namespace + if options.Namespace != "" { + prefix = options.Namespace + } + if prefix == "" { return keys, nil } - nkeys := make([]string, 0, len(keys)) - for _, key := range keys { - nkeys = append(nkeys, strings.TrimPrefix(key, options.Namespace)) + for idx, key := range keys { + keys[idx] = strings.TrimPrefix(key, prefix) } - return nkeys, nil + + return keys, nil } func (r *Store) Options() store.Options { @@ -331,59 +663,214 @@ func (r *Store) String() string { } func NewStore(opts ...store.Option) *Store { - return &Store{opts: store.NewOptions(opts...)} + b := atomic.Uint32{} + return &Store{ + opts: store.NewOptions(opts...), + connected: &b, + watchers: make(map[string]*watcher), + } } -func (r *Store) configure(opts ...options.Option) error { +func (r *Store) configure(opts ...store.Option) error { if r.cli != nil && len(opts) == 0 { return nil } - var redisUniversalOptions *redis.UniversalOptions - var err error - for _, o := range opts { - if err = o(r.opts); err != nil { - return err - } + o(&r.opts) } + universalOptions := DefaultUniversalOptions + if r.opts.Context != nil { - if c, ok := r.opts.Context.Value(universalConfigKey{}).(*redis.UniversalOptions); ok { - redisUniversalOptions = c - if r.opts.TLSConfig != nil { - redisUniversalOptions.TLSConfig = r.opts.TLSConfig + if o, ok := r.opts.Context.Value(configKey{}).(*goredis.Options); ok { + universalOptions.Addrs = []string{o.Addr} + universalOptions.Dialer = o.Dialer + universalOptions.OnConnect = o.OnConnect + universalOptions.Username = o.Username + universalOptions.Password = o.Password + + universalOptions.MaxRetries = o.MaxRetries + universalOptions.MinRetryBackoff = o.MinRetryBackoff + universalOptions.MaxRetryBackoff = o.MaxRetryBackoff + + universalOptions.DialTimeout = o.DialTimeout + universalOptions.ReadTimeout = o.ReadTimeout + universalOptions.WriteTimeout = o.WriteTimeout + universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled + + universalOptions.PoolFIFO = o.PoolFIFO + + universalOptions.PoolSize = o.PoolSize + universalOptions.PoolTimeout = o.PoolTimeout + universalOptions.MinIdleConns = o.MinIdleConns + universalOptions.MaxIdleConns = o.MaxIdleConns + universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime + universalOptions.ConnMaxLifetime = o.ConnMaxLifetime + + if o.TLSConfig != nil { + universalOptions.TLSConfig = o.TLSConfig + } + } + + if o, ok := r.opts.Context.Value(clusterConfigKey{}).(*goredis.ClusterOptions); ok { + universalOptions.Addrs = o.Addrs + universalOptions.Dialer = o.Dialer + universalOptions.OnConnect = o.OnConnect + universalOptions.Username = o.Username + universalOptions.Password = o.Password + + universalOptions.MaxRedirects = o.MaxRedirects + universalOptions.ReadOnly = o.ReadOnly + universalOptions.RouteByLatency = o.RouteByLatency + universalOptions.RouteRandomly = o.RouteRandomly + + universalOptions.MaxRetries = o.MaxRetries + universalOptions.MinRetryBackoff = o.MinRetryBackoff + universalOptions.MaxRetryBackoff = o.MaxRetryBackoff + + universalOptions.DialTimeout = o.DialTimeout + universalOptions.ReadTimeout = o.ReadTimeout + universalOptions.WriteTimeout = o.WriteTimeout + universalOptions.ContextTimeoutEnabled = o.ContextTimeoutEnabled + + universalOptions.PoolFIFO = o.PoolFIFO + + universalOptions.PoolSize = o.PoolSize + universalOptions.PoolTimeout = o.PoolTimeout + universalOptions.MinIdleConns = o.MinIdleConns + universalOptions.MaxIdleConns = o.MaxIdleConns + universalOptions.ConnMaxIdleTime = o.ConnMaxIdleTime + universalOptions.ConnMaxLifetime = o.ConnMaxLifetime + if o.TLSConfig != nil { + universalOptions.TLSConfig = o.TLSConfig + } + } + + if o, ok := r.opts.Context.Value(universalConfigKey{}).(*goredis.UniversalOptions); ok { + universalOptions = o + if o.TLSConfig != nil { + universalOptions.TLSConfig = o.TLSConfig } } } - if redisUniversalOptions == nil && r.cli != nil { - return nil - } - - if redisUniversalOptions == nil { - redisUniversalOptions = &redis.UniversalOptions{ - Username: "", - Password: "", // no password set - DB: 0, // use default DB - MaxRetries: 2, - MaxRetryBackoff: 256 * time.Millisecond, - DialTimeout: 1 * time.Second, - ReadTimeout: 1 * time.Second, - WriteTimeout: 1 * time.Second, - PoolTimeout: 1 * time.Second, - MinIdleConns: 10, - TLSConfig: r.opts.TLSConfig, - } - } - if len(r.opts.Addrs) > 0 { - redisUniversalOptions.Addrs = r.opts.Addrs - } else if len(redisUniversalOptions.Addrs) == 0 { - redisUniversalOptions.Addrs = []string{"redis://127.0.0.1:6379"} + universalOptions.Addrs = r.opts.Addrs + } else { + universalOptions.Addrs = []string{"127.0.0.1:6379"} } - r.cli = redis.NewUniversalClient(redisUniversalOptions) + r.cli = goredis.NewUniversalClient(universalOptions) + setTracing(r.cli, r.opts.Tracer) + r.cli.AddHook(newEventHook(r)) + + r.pool = pool.NewStringsPool(50) + + r.statsMeter() return nil } + +func (r *Store) getKey(b *strings.Builder, mainNamespace string, opNamespace string, key string) string { + if opNamespace == "" { + opNamespace = mainNamespace + } + if opNamespace != "" { + b.WriteString(opNamespace) + b.WriteString(r.opts.Separator) + } + b.WriteString(key) + return b.String() +} + +func (r *Store) Watch(ctx context.Context, opts ...store.WatchOption) (store.Watcher, error) { + id, err := id.New() + if err != nil { + return nil, err + } + wo, err := store.NewWatchOptions(opts...) + if err != nil { + return nil, err + } + // construct the watcher + w := &watcher{ + exit: make(chan bool), + ch: make(chan store.Event), + id: id, + opts: wo, + } + + r.mu.Lock() + r.watchers[w.id] = w + r.mu.Unlock() + + return w, nil +} + +func (r *Store) sendEvent(e store.Event) { + r.mu.RLock() + watchers := make([]*watcher, 0, len(r.watchers)) + for _, w := range r.watchers { + watchers = append(watchers, w) + } + r.mu.RUnlock() + for _, w := range watchers { + select { + case <-w.exit: + r.mu.Lock() + delete(r.watchers, w.id) + r.mu.Unlock() + default: + select { + case w.ch <- e: + case <-time.After(sendEventTime): + } + } + } +} + +type watcher struct { + ch chan store.Event + exit chan bool + opts store.WatchOptions + id string +} + +func (w *watcher) Next() (store.Event, error) { + for { + select { + case e := <-w.ch: + return e, nil + case <-w.exit: + return nil, store.ErrWatcherStopped + } + } +} + +func (w *watcher) Stop() { + select { + case <-w.exit: + return + default: + close(w.exit) + } +} + +type event struct { + ts time.Time + t store.EventType + err error +} + +func (e *event) Error() error { + return e.err +} + +func (e *event) Timestamp() time.Time { + return e.ts +} + +func (e *event) Type() store.EventType { + return e.t +} diff --git a/redis_test.go b/redis_test.go index 93b4b80..6652904 100755 --- a/redis_test.go +++ b/redis_test.go @@ -4,17 +4,67 @@ import ( "bytes" "context" "os" + "sync/atomic" "testing" "time" - "github.com/redis/go-redis/v9" + goredis "github.com/redis/go-redis/v9" "go.unistack.org/micro/v4/store" + "go.unistack.org/micro/v4/tracer" ) +func TestLazyConnect(t *testing.T) { + t.Skip("skipping test for manual check") + ctx := context.Background() + var err error + + r := NewStore() + + if err = r.Init(); err != nil { + t.Fatal(err) + } + if err = r.Connect(ctx); err != nil { + t.Logf("connect failed %v", err) + } + + for { + if err = r.Write(ctx, "mykey", "myval"); err != nil { + t.Logf("failed to write %v", err) + } + } +} + +func TestKeepTTL(t *testing.T) { + ctx := context.Background() + + if tr := os.Getenv("INTEGRATION_TESTS"); len(tr) > 0 { + t.Skip() + } + r := NewStore(store.Addrs(os.Getenv("STORE_NODES"))) + + if err := r.Init(store.LazyConnect(true)); err != nil { + t.Fatal(err) + } + if err := r.Connect(ctx); err != nil { + t.Fatal(err) + } + + key := "key" + err := r.Write(ctx, key, "val1", store.WriteTTL(15*time.Second)) + if err != nil { + t.Fatalf("Write error: %v", err) + } + time.Sleep(3 * time.Second) + err = r.Write(ctx, key, "val2", store.WriteTTL(-1)) + if err != nil { + t.Fatalf("Write error: %v", err) + } +} + func Test_rkv_configure(t *testing.T) { type fields struct { options store.Options - Client *redis.Client + Client goredis.UniversalClient } type wantValues struct { username string @@ -37,7 +87,7 @@ func Test_rkv_configure(t *testing.T) { }, }, { - name: "legacy Url", fields: fields{options: store.Options{Addrs: []string{"127.0.0.1:6379"}}, Client: nil}, + name: "legacy Url", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"127.0.0.1:6379"}}, Client: nil}, wantErr: false, want: wantValues{ username: "", password: "", @@ -45,7 +95,7 @@ func Test_rkv_configure(t *testing.T) { }, }, { - name: "New Url", fields: fields{options: store.Options{Addrs: []string{"redis://127.0.0.1:6379"}}, Client: nil}, + name: "New Url", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"redis://127.0.0.1:6379"}}, Client: nil}, wantErr: false, want: wantValues{ username: "", password: "", @@ -53,7 +103,7 @@ func Test_rkv_configure(t *testing.T) { }, }, { - name: "Url with Pwd", fields: fields{options: store.Options{Addrs: []string{"redis://:password@redis:6379"}}, Client: nil}, + name: "Url with Pwd", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"redis://:password@redis:6379"}}, Client: nil}, wantErr: false, want: wantValues{ username: "", password: "password", @@ -61,7 +111,7 @@ func Test_rkv_configure(t *testing.T) { }, }, { - name: "Url with username and Pwd", fields: fields{options: store.Options{Addrs: []string{"redis://username:password@redis:6379"}}, Client: nil}, + name: "Url with username and Pwd", fields: fields{options: store.Options{Tracer: tracer.DefaultTracer, Addrs: []string{"redis://username:password@redis:6379"}}, Client: nil}, wantErr: false, want: wantValues{ username: "username", password: "password", @@ -71,9 +121,11 @@ func Test_rkv_configure(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + b := atomic.Uint32{} rc := &Store{ - opts: tt.fields.options, - cli: tt.fields.Client, + opts: tt.fields.options, + cli: tt.fields.Client, + connected: &b, } err := rc.configure() if (err != nil) != tt.wantErr { diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..177273d --- /dev/null +++ b/stats.go @@ -0,0 +1,49 @@ +package redis + +import ( + "time" + + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v4/meter" +) + +var ( + PoolHitsTotal = "pool_hits_total" + PoolMissesTotal = "pool_misses_total" + PoolTimeoutTotal = "pool_timeout_total" + PoolConnTotalCurrent = "pool_conn_total_current" + PoolConnIdleCurrent = "pool_conn_idle_current" + PoolConnStaleTotal = "pool_conn_stale_total" +) + +type Statser interface { + PoolStats() *goredis.PoolStats +} + +func (r *Store) statsMeter() { + var st Statser + + if r.cli != nil { + st = r.cli + } else { + return + } + + go func() { + ticker := time.NewTicker(meter.DefaultMeterStatsInterval) + defer ticker.Stop() + + for range ticker.C { + if st == nil { + return + } + stats := st.PoolStats() + r.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits)) + r.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses)) + r.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts)) + r.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns)) + r.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns)) + r.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns)) + } + }() +} diff --git a/tracer.go b/tracer.go new file mode 100644 index 0000000..11199be --- /dev/null +++ b/tracer.go @@ -0,0 +1,128 @@ +package redis + +import ( + "context" + "fmt" + "net" + "strconv" + + rediscmd "github.com/redis/go-redis/extra/rediscmd/v9" + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v4/tracer" +) + +func setTracing(rdb goredis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) { + switch rdb := rdb.(type) { + case *goredis.Client: + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + case *goredis.ClusterClient: + rdb.OnNewNode(func(rdb *goredis.Client) { + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + }) + case *goredis.Ring: + rdb.OnNewNode(func(rdb *goredis.Client) { + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + }) + } +} + +type tracingHook struct { + tr tracer.Tracer + opts []tracer.SpanOption +} + +var _ goredis.Hook = (*tracingHook)(nil) + +func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook { + opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient)) + if connString != "" { + opts = append(opts, tracer.WithSpanLabels("db.connection_string", connString)) + } + + return &tracingHook{ + tr: tr, + opts: opts, + } +} + +func (h *tracingHook) DialHook(hook goredis.DialHook) goredis.DialHook { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + /* + _, span := h.tr.Start(ctx, "goredis.dial", h.opts...) + defer span.Finish() + */ + conn, err := hook(ctx, network, addr) + // recordError(span, err) + + return conn, err + } +} + +func (h *tracingHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { + return func(ctx context.Context, cmd goredis.Cmder) error { + cmdString := rediscmd.CmdString(cmd) + var err error + + switch cmdString { + case "cluster slots": + break + default: + _, span := h.tr.Start(ctx, "sdk.database", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) + defer func() { + recordError(span, err) + span.Finish() + }() + } + + err = hook(ctx, cmd) + + return err + } +} + +func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook { + return func(ctx context.Context, cmds []goredis.Cmder) error { + _, cmdsString := rediscmd.CmdsString(cmds) + + opts := append(h.opts, tracer.WithSpanLabels( + "db.database.num_cmd", strconv.Itoa(len(cmds)), + "db.statement", cmdsString, + )) + + _, span := h.tr.Start(ctx, "sdk.database", opts...) + defer span.Finish() + + err := hook(ctx, cmds) + recordError(span, err) + + return err + } +} + +func setSpanError(ctx context.Context, err error) { + if err == nil || err == goredis.Nil { + return + } + if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } +} + +func recordError(span tracer.Span, err error) { + if err != nil && err != goredis.Nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } +} + +func formatDBConnString(network, addr string) string { + if network == "tcp" { + network = "redis" + } + return fmt.Sprintf("%s://%s", network, addr) +}