5 Commits

Author SHA1 Message Date
35b4ea057c fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:08:01 +03:00
46fbd9846a fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:56:52 +03:00
002a038413 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:45:22 +03:00
13a4527f83 update go deps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:12:39 +03:00
2a7ce9411b fixup options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-09 18:58:31 +03:00
5 changed files with 19 additions and 31 deletions

3
go.mod
View File

@@ -1,8 +1,9 @@
module go.unistack.org/micro-broker-redis/v3
go 1.23.1
go 1.22
require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3
github.com/redis/go-redis/v9 v9.6.1
go.unistack.org/micro/v3 v3.10.84
)

2
go.sum
View File

@@ -6,6 +6,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20=
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4=

View File

@@ -7,20 +7,13 @@ import (
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/store"
"go.unistack.org/micro/v3/tracer"
)
type configKey struct{}
func Config(c *redis.Options) store.Option {
return store.SetOption(configKey{}, c)
}
type clusterConfigKey struct{}
func ClusterConfig(c *redis.ClusterOptions) store.Option {
return store.SetOption(clusterConfigKey{}, c)
func Config(c *redis.UniversalOptions) broker.Option {
return broker.SetOption(configKey{}, c)
}
var (

View File

@@ -230,45 +230,41 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
done: make(chan struct{}),
}
// Run the receiver routine.
go s.loop()
s.sub = b.cli.Subscribe(s.ctx, s.topic)
if err := s.sub.Ping(ctx, ""); err != nil {
return nil, err
}
go s.loop()
return s, nil
}
func (b *Broker) configure() error {
redisOptions := DefaultOptions
if b.cli != nil && b.opts.Context == nil {
return nil
if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
redisOptions = c
}
}
if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
redisOptions = c
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig
}
}
if len(b.opts.Addrs) > 0 {
redisOptions.Addrs = b.opts.Addrs
}
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig
}
if redisOptions == nil && b.cli != nil {
return nil
}
if redisOptions == nil {
redisOptions.Addrs = b.opts.Addrs
redisOptions.TLSConfig = b.opts.TLSConfig
}
c := redis.NewUniversalClient(redisOptions)
setTracing(c, b.opts.Tracer)
b.cli = c
b.statsMeter()
return nil

View File

@@ -13,10 +13,6 @@ var (
PoolConnTotalCurrent = "pool_conn_total_current"
PoolConnIdleCurrent = "pool_conn_idle_current"
PoolConnStaleTotal = "pool_conn_stale_total"
meterRequestTotal = "request_total"
meterRequestLatencyMicroseconds = "latency_microseconds"
meterRequestDurationSeconds = "request_duration_seconds"
)
type Statser interface {