6 Commits

Author SHA1 Message Date
de0af10391 update deps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 18:08:47 +03:00
dbbdd81a57 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:30:46 +03:00
c7b6158602 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:17:59 +03:00
35b4ea057c fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:08:01 +03:00
46fbd9846a fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:56:52 +03:00
002a038413 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:45:22 +03:00
4 changed files with 34 additions and 27 deletions

6
go.mod
View File

@@ -5,10 +5,12 @@ go 1.22
require ( require (
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3
github.com/redis/go-redis/v9 v9.6.1 github.com/redis/go-redis/v9 v9.6.1
go.unistack.org/micro/v3 v3.10.84 go.unistack.org/micro/v3 v3.10.91
) )
require ( require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
) )

10
go.sum
View File

@@ -4,11 +4,21 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20=
github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ= github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4= go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4=
go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
go.unistack.org/micro/v3 v3.10.91 h1:vuJY4tXwpqimwIkEJ3TozMYNVQQs+C5QMlQWPgSY/YM=
go.unistack.org/micro/v3 v3.10.91/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=

View File

@@ -45,17 +45,12 @@ func NewOptions(opts ...Option) Options {
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
MeterStatsInterval: DefaultMeterStatsInterval, MeterStatsInterval: DefaultMeterStatsInterval,
MeterMetricPrefix: DefaultMeterMetricPrefix,
} }
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
options.Meter = options.Meter.Clone(
meter.MetricPrefix(options.MeterMetricPrefix),
)
options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1)) options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
return options return options

View File

@@ -35,9 +35,9 @@ var (
// Event is an broker.Event // Event is an broker.Event
type Event struct { type Event struct {
ctx context.Context ctx context.Context
topic string
msg *broker.Message
err error err error
msg *broker.Message
topic string
} }
// Topic returns the topic this Event applies to. // Topic returns the topic this Event applies to.
@@ -106,6 +106,8 @@ func (s *Subscriber) loop() {
} }
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg) err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
if err != nil { if err != nil {
p.msg.Body = codec.RawMessage(msg.Payload) p.msg.Body = codec.RawMessage(msg.Payload)
if eh != nil { if eh != nil {
@@ -223,6 +225,7 @@ func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broke
// Subscribe returns a broker.Subscriber for the topic and handler // Subscribe returns a broker.Subscriber for the topic and handler
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
s := &Subscriber{ s := &Subscriber{
ctx: ctx,
topic: topic, topic: topic,
handle: handler, handle: handler,
opts: b.opts, opts: b.opts,
@@ -230,45 +233,41 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
done: make(chan struct{}), done: make(chan struct{}),
} }
// Run the receiver routine.
go s.loop()
s.sub = b.cli.Subscribe(s.ctx, s.topic) s.sub = b.cli.Subscribe(s.ctx, s.topic)
if err := s.sub.Ping(ctx, ""); err != nil { if err := s.sub.Ping(ctx, ""); err != nil {
return nil, err return nil, err
} }
go s.loop()
return s, nil return s, nil
} }
func (b *Broker) configure() error { func (b *Broker) configure() error {
redisOptions := DefaultOptions redisOptions := DefaultOptions
if b.cli != nil && b.opts.Context == nil { if b.opts.Context != nil {
return nil if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
redisOptions = c
}
}
if len(b.opts.Addrs) > 0 {
redisOptions.Addrs = b.opts.Addrs
} }
if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
redisOptions = c
if b.opts.TLSConfig != nil { if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig redisOptions.TLSConfig = b.opts.TLSConfig
} }
}
}
if redisOptions == nil && b.cli != nil { if redisOptions == nil && b.cli != nil {
return nil return nil
} }
if redisOptions == nil {
redisOptions.Addrs = b.opts.Addrs
redisOptions.TLSConfig = b.opts.TLSConfig
}
c := redis.NewUniversalClient(redisOptions) c := redis.NewUniversalClient(redisOptions)
setTracing(c, b.opts.Tracer) setTracing(c, b.opts.Tracer)
b.cli = c
b.statsMeter() b.statsMeter()
return nil return nil
@@ -280,6 +279,7 @@ func (b *Broker) Connect(ctx context.Context) error {
err = b.cli.Ping(ctx).Err() err = b.cli.Ping(ctx).Err()
} }
setSpanError(ctx, err) setSpanError(ctx, err)
b.done = make(chan struct{})
return err return err
} }
@@ -318,5 +318,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
} }
func NewBroker(opts ...broker.Option) *Broker { func NewBroker(opts ...broker.Option) *Broker {
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)} return &Broker{opts: broker.NewOptions(opts...)}
} }