Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
c7b6158602 | |||
35b4ea057c |
10
redis.go
10
redis.go
@@ -35,9 +35,9 @@ var (
|
||||
// Event is an broker.Event
|
||||
type Event struct {
|
||||
ctx context.Context
|
||||
topic string
|
||||
msg *broker.Message
|
||||
err error
|
||||
msg *broker.Message
|
||||
topic string
|
||||
}
|
||||
|
||||
// Topic returns the topic this Event applies to.
|
||||
@@ -106,6 +106,8 @@ func (s *Subscriber) loop() {
|
||||
}
|
||||
|
||||
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
||||
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
|
||||
|
||||
if err != nil {
|
||||
p.msg.Body = codec.RawMessage(msg.Payload)
|
||||
if eh != nil {
|
||||
@@ -264,6 +266,7 @@ func (b *Broker) configure() error {
|
||||
c := redis.NewUniversalClient(redisOptions)
|
||||
setTracing(c, b.opts.Tracer)
|
||||
|
||||
b.cli = c
|
||||
b.statsMeter()
|
||||
|
||||
return nil
|
||||
@@ -275,6 +278,7 @@ func (b *Broker) Connect(ctx context.Context) error {
|
||||
err = b.cli.Ping(ctx).Err()
|
||||
}
|
||||
setSpanError(ctx, err)
|
||||
b.done = make(chan struct{})
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -313,5 +317,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func NewBroker(opts ...broker.Option) *Broker {
|
||||
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
|
||||
return &Broker{opts: broker.NewOptions(opts...)}
|
||||
}
|
||||
|
Reference in New Issue
Block a user