fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
35b4ea057c
commit
c7b6158602
9
redis.go
9
redis.go
@ -35,9 +35,9 @@ var (
|
|||||||
// Event is an broker.Event
|
// Event is an broker.Event
|
||||||
type Event struct {
|
type Event struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
topic string
|
|
||||||
msg *broker.Message
|
|
||||||
err error
|
err error
|
||||||
|
msg *broker.Message
|
||||||
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Topic returns the topic this Event applies to.
|
// Topic returns the topic this Event applies to.
|
||||||
@ -106,6 +106,8 @@ func (s *Subscriber) loop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
||||||
|
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.msg.Body = codec.RawMessage(msg.Payload)
|
p.msg.Body = codec.RawMessage(msg.Payload)
|
||||||
if eh != nil {
|
if eh != nil {
|
||||||
@ -276,6 +278,7 @@ func (b *Broker) Connect(ctx context.Context) error {
|
|||||||
err = b.cli.Ping(ctx).Err()
|
err = b.cli.Ping(ctx).Err()
|
||||||
}
|
}
|
||||||
setSpanError(ctx, err)
|
setSpanError(ctx, err)
|
||||||
|
b.done = make(chan struct{})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -314,5 +317,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker(opts ...broker.Option) *Broker {
|
func NewBroker(opts ...broker.Option) *Broker {
|
||||||
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
|
return &Broker{opts: broker.NewOptions(opts...)}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user