2 Commits

Author SHA1 Message Date
46fbd9846a fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:56:52 +03:00
002a038413 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:45:22 +03:00

View File

@@ -230,42 +230,37 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
done: make(chan struct{}), done: make(chan struct{}),
} }
// Run the receiver routine.
go s.loop()
s.sub = b.cli.Subscribe(s.ctx, s.topic) s.sub = b.cli.Subscribe(s.ctx, s.topic)
if err := s.sub.Ping(ctx, ""); err != nil { if err := s.sub.Ping(ctx, ""); err != nil {
return nil, err return nil, err
} }
go s.loop()
return s, nil return s, nil
} }
func (b *Broker) configure() error { func (b *Broker) configure() error {
redisOptions := DefaultOptions redisOptions := DefaultOptions
if b.cli != nil && b.opts.Context == nil { if b.opts.Context != nil {
return nil if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
redisOptions = c
}
} }
if b.opts.Context != nil { if len(b.opts.Addrs) > 0 {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok { redisOptions.Addrs = b.opts.Addrs
redisOptions = c }
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig if b.opts.TLSConfig != nil {
} redisOptions.TLSConfig = b.opts.TLSConfig
}
} }
if redisOptions == nil && b.cli != nil { if redisOptions == nil && b.cli != nil {
return nil return nil
} }
if redisOptions == nil {
redisOptions.Addrs = b.opts.Addrs
redisOptions.TLSConfig = b.opts.TLSConfig
}
c := redis.NewUniversalClient(redisOptions) c := redis.NewUniversalClient(redisOptions)
setTracing(c, b.opts.Tracer) setTracing(c, b.opts.Tracer)