4 Commits

Author SHA1 Message Date
c7b6158602 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:17:59 +03:00
35b4ea057c fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:08:01 +03:00
46fbd9846a fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:56:52 +03:00
002a038413 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:45:22 +03:00

View File

@@ -35,9 +35,9 @@ var (
// Event is an broker.Event
type Event struct {
ctx context.Context
topic string
msg *broker.Message
err error
msg *broker.Message
topic string
}
// Topic returns the topic this Event applies to.
@@ -106,6 +106,8 @@ func (s *Subscriber) loop() {
}
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
if err != nil {
p.msg.Body = codec.RawMessage(msg.Payload)
if eh != nil {
@@ -230,45 +232,41 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
done: make(chan struct{}),
}
// Run the receiver routine.
go s.loop()
s.sub = b.cli.Subscribe(s.ctx, s.topic)
if err := s.sub.Ping(ctx, ""); err != nil {
return nil, err
}
go s.loop()
return s, nil
}
func (b *Broker) configure() error {
redisOptions := DefaultOptions
if b.cli != nil && b.opts.Context == nil {
return nil
if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
redisOptions = c
}
}
if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
redisOptions = c
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig
}
}
if len(b.opts.Addrs) > 0 {
redisOptions.Addrs = b.opts.Addrs
}
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig
}
if redisOptions == nil && b.cli != nil {
return nil
}
if redisOptions == nil {
redisOptions.Addrs = b.opts.Addrs
redisOptions.TLSConfig = b.opts.TLSConfig
}
c := redis.NewUniversalClient(redisOptions)
setTracing(c, b.opts.Tracer)
b.cli = c
b.statsMeter()
return nil
@@ -280,6 +278,7 @@ func (b *Broker) Connect(ctx context.Context) error {
err = b.cli.Ping(ctx).Err()
}
setSpanError(ctx, err)
b.done = make(chan struct{})
return err
}
@@ -318,5 +317,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
}
func NewBroker(opts ...broker.Option) *Broker {
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
return &Broker{opts: broker.NewOptions(opts...)}
}