2 Commits

Author SHA1 Message Date
dbbdd81a57 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:30:46 +03:00
c7b6158602 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:17:59 +03:00

View File

@@ -35,9 +35,9 @@ var (
// Event is an broker.Event // Event is an broker.Event
type Event struct { type Event struct {
ctx context.Context ctx context.Context
topic string
msg *broker.Message
err error err error
msg *broker.Message
topic string
} }
// Topic returns the topic this Event applies to. // Topic returns the topic this Event applies to.
@@ -106,6 +106,8 @@ func (s *Subscriber) loop() {
} }
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg) err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
if err != nil { if err != nil {
p.msg.Body = codec.RawMessage(msg.Payload) p.msg.Body = codec.RawMessage(msg.Payload)
if eh != nil { if eh != nil {
@@ -223,6 +225,7 @@ func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broke
// Subscribe returns a broker.Subscriber for the topic and handler // Subscribe returns a broker.Subscriber for the topic and handler
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
s := &Subscriber{ s := &Subscriber{
ctx: ctx,
topic: topic, topic: topic,
handle: handler, handle: handler,
opts: b.opts, opts: b.opts,
@@ -276,6 +279,7 @@ func (b *Broker) Connect(ctx context.Context) error {
err = b.cli.Ping(ctx).Err() err = b.cli.Ping(ctx).Err()
} }
setSpanError(ctx, err) setSpanError(ctx, err)
b.done = make(chan struct{})
return err return err
} }
@@ -314,5 +318,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
} }
func NewBroker(opts ...broker.Option) *Broker { func NewBroker(opts ...broker.Option) *Broker {
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)} return &Broker{opts: broker.NewOptions(opts...)}
} }