diff --git a/redis.go b/redis.go index 27ecb72..bf6c648 100644 --- a/redis.go +++ b/redis.go @@ -35,9 +35,9 @@ var ( // Event is an broker.Event type Event struct { ctx context.Context - topic string - msg *broker.Message err error + msg *broker.Message + topic string } // Topic returns the topic this Event applies to. @@ -106,6 +106,8 @@ func (s *Subscriber) loop() { } err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg) + p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header) + if err != nil { p.msg.Body = codec.RawMessage(msg.Payload) if eh != nil { @@ -276,6 +278,7 @@ func (b *Broker) Connect(ctx context.Context) error { err = b.cli.Ping(ctx).Err() } setSpanError(ctx, err) + b.done = make(chan struct{}) return err } @@ -314,5 +317,5 @@ func (b *Broker) Disconnect(ctx context.Context) error { } func NewBroker(opts ...broker.Option) *Broker { - return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)} + return &Broker{opts: broker.NewOptions(opts...)} }