Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
1adc91ef4b | |||
906b189f8b | |||
778f78f48d | |||
de0af10391 | |||
dbbdd81a57 | |||
c7b6158602 | |||
35b4ea057c | |||
46fbd9846a | |||
002a038413 | |||
13a4527f83 | |||
2a7ce9411b |
25
.gitignore
vendored
Normal file
25
.gitignore
vendored
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
# Binaries for programs and plugins
|
||||||
|
*.exe
|
||||||
|
*.exe~
|
||||||
|
*.dll
|
||||||
|
*.so
|
||||||
|
*.dylib
|
||||||
|
bin
|
||||||
|
|
||||||
|
# Test binary, built with `go test -c`
|
||||||
|
*.test
|
||||||
|
cmd/test
|
||||||
|
|
||||||
|
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||||
|
*.out
|
||||||
|
|
||||||
|
# Dependency directories (remove the comment below to include it)
|
||||||
|
# vendor/
|
||||||
|
|
||||||
|
# Go workspace file
|
||||||
|
go.work
|
||||||
|
|
||||||
|
# General
|
||||||
|
.DS_Store
|
||||||
|
.idea
|
||||||
|
.vscode
|
12
go.mod
12
go.mod
@ -1,13 +1,17 @@
|
|||||||
module go.unistack.org/micro-broker-redis/v3
|
module go.unistack.org/micro-broker-redis/v3
|
||||||
|
|
||||||
go 1.23.1
|
go 1.22
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/redis/go-redis/v9 v9.6.1
|
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2
|
||||||
go.unistack.org/micro/v3 v3.10.84
|
github.com/redis/go-redis/v9 v9.6.2
|
||||||
|
go.unistack.org/micro/v3 v3.10.97
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
|
github.com/google/go-cmp v0.5.9 // indirect
|
||||||
|
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
||||||
|
google.golang.org/protobuf v1.35.1 // indirect
|
||||||
)
|
)
|
||||||
|
20
go.sum
20
go.sum
@ -2,11 +2,19 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
|||||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
|
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||||
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
|
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4=
|
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2 h1:oBlErygFka9FAVfowzW7sRUfB7n31YX0aFzfaOflM3w=
|
||||||
go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
|
github.com/redis/go-redis/extra/rediscmd/v9 v9.6.2/go.mod h1:jYLUE5tC8UsFnpSclhEjZlFLMMtHH4jTfFMnshNWkoo=
|
||||||
|
github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk=
|
||||||
|
github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
|
||||||
|
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
||||||
|
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
||||||
|
go.unistack.org/micro/v3 v3.10.97 h1:8l7fv+i06/PjPrBBhRC/ZQkWGIOuHPg3jJN0vktYE78=
|
||||||
|
go.unistack.org/micro/v3 v3.10.97/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g=
|
||||||
|
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
|
||||||
|
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||||
|
18
options.go
18
options.go
@ -7,20 +7,13 @@ import (
|
|||||||
"go.unistack.org/micro/v3/broker"
|
"go.unistack.org/micro/v3/broker"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v3/logger"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v3/meter"
|
||||||
"go.unistack.org/micro/v3/store"
|
|
||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
type configKey struct{}
|
type configKey struct{}
|
||||||
|
|
||||||
func Config(c *redis.Options) store.Option {
|
func Config(c *redis.UniversalOptions) broker.Option {
|
||||||
return store.SetOption(configKey{}, c)
|
return broker.SetOption(configKey{}, c)
|
||||||
}
|
|
||||||
|
|
||||||
type clusterConfigKey struct{}
|
|
||||||
|
|
||||||
func ClusterConfig(c *redis.ClusterOptions) store.Option {
|
|
||||||
return store.SetOption(clusterConfigKey{}, c)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -52,19 +45,12 @@ func NewOptions(opts ...Option) Options {
|
|||||||
Meter: meter.DefaultMeter,
|
Meter: meter.DefaultMeter,
|
||||||
Tracer: tracer.DefaultTracer,
|
Tracer: tracer.DefaultTracer,
|
||||||
MeterStatsInterval: DefaultMeterStatsInterval,
|
MeterStatsInterval: DefaultMeterStatsInterval,
|
||||||
MeterMetricPrefix: DefaultMeterMetricPrefix,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
options.Meter = options.Meter.Clone(
|
|
||||||
meter.MetricPrefix(options.MeterMetricPrefix),
|
|
||||||
)
|
|
||||||
|
|
||||||
options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
|
|
||||||
|
|
||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
40
redis.go
40
redis.go
@ -35,9 +35,9 @@ var (
|
|||||||
// Event is an broker.Event
|
// Event is an broker.Event
|
||||||
type Event struct {
|
type Event struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
topic string
|
|
||||||
msg *broker.Message
|
|
||||||
err error
|
err error
|
||||||
|
msg *broker.Message
|
||||||
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Topic returns the topic this Event applies to.
|
// Topic returns the topic this Event applies to.
|
||||||
@ -106,6 +106,8 @@ func (s *Subscriber) loop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
||||||
|
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.msg.Body = codec.RawMessage(msg.Payload)
|
p.msg.Body = codec.RawMessage(msg.Payload)
|
||||||
if eh != nil {
|
if eh != nil {
|
||||||
@ -223,6 +225,7 @@ func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broke
|
|||||||
// Subscribe returns a broker.Subscriber for the topic and handler
|
// Subscribe returns a broker.Subscriber for the topic and handler
|
||||||
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
s := &Subscriber{
|
s := &Subscriber{
|
||||||
|
ctx: ctx,
|
||||||
topic: topic,
|
topic: topic,
|
||||||
handle: handler,
|
handle: handler,
|
||||||
opts: b.opts,
|
opts: b.opts,
|
||||||
@ -230,45 +233,41 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
|||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the receiver routine.
|
|
||||||
go s.loop()
|
|
||||||
|
|
||||||
s.sub = b.cli.Subscribe(s.ctx, s.topic)
|
s.sub = b.cli.Subscribe(s.ctx, s.topic)
|
||||||
if err := s.sub.Ping(ctx, ""); err != nil {
|
if err := s.sub.Ping(ctx, ""); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go s.loop()
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) configure() error {
|
func (b *Broker) configure() error {
|
||||||
redisOptions := DefaultOptions
|
redisOptions := DefaultOptions
|
||||||
|
|
||||||
if b.cli != nil && b.opts.Context == nil {
|
if b.opts.Context != nil {
|
||||||
return nil
|
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
|
||||||
|
redisOptions = c
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.opts.Context != nil {
|
if len(b.opts.Addrs) > 0 {
|
||||||
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
|
redisOptions.Addrs = b.opts.Addrs
|
||||||
redisOptions = c
|
}
|
||||||
if b.opts.TLSConfig != nil {
|
|
||||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
if b.opts.TLSConfig != nil {
|
||||||
}
|
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if redisOptions == nil && b.cli != nil {
|
if redisOptions == nil && b.cli != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if redisOptions == nil {
|
|
||||||
redisOptions.Addrs = b.opts.Addrs
|
|
||||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
c := redis.NewUniversalClient(redisOptions)
|
c := redis.NewUniversalClient(redisOptions)
|
||||||
setTracing(c, b.opts.Tracer)
|
setTracing(c, b.opts.Tracer)
|
||||||
|
|
||||||
|
b.cli = c
|
||||||
b.statsMeter()
|
b.statsMeter()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -280,6 +279,7 @@ func (b *Broker) Connect(ctx context.Context) error {
|
|||||||
err = b.cli.Ping(ctx).Err()
|
err = b.cli.Ping(ctx).Err()
|
||||||
}
|
}
|
||||||
setSpanError(ctx, err)
|
setSpanError(ctx, err)
|
||||||
|
b.done = make(chan struct{})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -318,5 +318,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker(opts ...broker.Option) *Broker {
|
func NewBroker(opts ...broker.Option) *Broker {
|
||||||
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
|
return &Broker{opts: broker.NewOptions(opts...)}
|
||||||
}
|
}
|
||||||
|
4
stats.go
4
stats.go
@ -13,10 +13,6 @@ var (
|
|||||||
PoolConnTotalCurrent = "pool_conn_total_current"
|
PoolConnTotalCurrent = "pool_conn_total_current"
|
||||||
PoolConnIdleCurrent = "pool_conn_idle_current"
|
PoolConnIdleCurrent = "pool_conn_idle_current"
|
||||||
PoolConnStaleTotal = "pool_conn_stale_total"
|
PoolConnStaleTotal = "pool_conn_stale_total"
|
||||||
|
|
||||||
meterRequestTotal = "request_total"
|
|
||||||
meterRequestLatencyMicroseconds = "latency_microseconds"
|
|
||||||
meterRequestDurationSeconds = "request_duration_seconds"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Statser interface {
|
type Statser interface {
|
||||||
|
Loading…
Reference in New Issue
Block a user