Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
46fbd9846a | |||
002a038413 |
29
redis.go
29
redis.go
@@ -230,42 +230,37 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Run the receiver routine.
|
||||
go s.loop()
|
||||
|
||||
s.sub = b.cli.Subscribe(s.ctx, s.topic)
|
||||
if err := s.sub.Ping(ctx, ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go s.loop()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (b *Broker) configure() error {
|
||||
redisOptions := DefaultOptions
|
||||
|
||||
if b.cli != nil && b.opts.Context == nil {
|
||||
return nil
|
||||
if b.opts.Context != nil {
|
||||
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
|
||||
redisOptions = c
|
||||
}
|
||||
}
|
||||
|
||||
if b.opts.Context != nil {
|
||||
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
|
||||
redisOptions = c
|
||||
if b.opts.TLSConfig != nil {
|
||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||
}
|
||||
}
|
||||
if len(b.opts.Addrs) > 0 {
|
||||
redisOptions.Addrs = b.opts.Addrs
|
||||
}
|
||||
|
||||
if b.opts.TLSConfig != nil {
|
||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||
}
|
||||
|
||||
if redisOptions == nil && b.cli != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if redisOptions == nil {
|
||||
redisOptions.Addrs = b.opts.Addrs
|
||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||
}
|
||||
|
||||
c := redis.NewUniversalClient(redisOptions)
|
||||
setTracing(c, b.opts.Tracer)
|
||||
|
||||
|
Reference in New Issue
Block a user