Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
dbbdd81a57 | |||
c7b6158602 | |||
35b4ea057c | |||
46fbd9846a | |||
002a038413 |
36
redis.go
36
redis.go
@@ -35,9 +35,9 @@ var (
|
|||||||
// Event is an broker.Event
|
// Event is an broker.Event
|
||||||
type Event struct {
|
type Event struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
topic string
|
|
||||||
msg *broker.Message
|
|
||||||
err error
|
err error
|
||||||
|
msg *broker.Message
|
||||||
|
topic string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Topic returns the topic this Event applies to.
|
// Topic returns the topic this Event applies to.
|
||||||
@@ -106,6 +106,8 @@ func (s *Subscriber) loop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
||||||
|
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.msg.Body = codec.RawMessage(msg.Payload)
|
p.msg.Body = codec.RawMessage(msg.Payload)
|
||||||
if eh != nil {
|
if eh != nil {
|
||||||
@@ -223,6 +225,7 @@ func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broke
|
|||||||
// Subscribe returns a broker.Subscriber for the topic and handler
|
// Subscribe returns a broker.Subscriber for the topic and handler
|
||||||
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||||
s := &Subscriber{
|
s := &Subscriber{
|
||||||
|
ctx: ctx,
|
||||||
topic: topic,
|
topic: topic,
|
||||||
handle: handler,
|
handle: handler,
|
||||||
opts: b.opts,
|
opts: b.opts,
|
||||||
@@ -230,45 +233,41 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
|||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the receiver routine.
|
|
||||||
go s.loop()
|
|
||||||
|
|
||||||
s.sub = b.cli.Subscribe(s.ctx, s.topic)
|
s.sub = b.cli.Subscribe(s.ctx, s.topic)
|
||||||
if err := s.sub.Ping(ctx, ""); err != nil {
|
if err := s.sub.Ping(ctx, ""); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go s.loop()
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) configure() error {
|
func (b *Broker) configure() error {
|
||||||
redisOptions := DefaultOptions
|
redisOptions := DefaultOptions
|
||||||
|
|
||||||
if b.cli != nil && b.opts.Context == nil {
|
if b.opts.Context != nil {
|
||||||
return nil
|
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
|
||||||
|
redisOptions = c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(b.opts.Addrs) > 0 {
|
||||||
|
redisOptions.Addrs = b.opts.Addrs
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.opts.Context != nil {
|
|
||||||
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
|
|
||||||
redisOptions = c
|
|
||||||
if b.opts.TLSConfig != nil {
|
if b.opts.TLSConfig != nil {
|
||||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if redisOptions == nil && b.cli != nil {
|
if redisOptions == nil && b.cli != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if redisOptions == nil {
|
|
||||||
redisOptions.Addrs = b.opts.Addrs
|
|
||||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
c := redis.NewUniversalClient(redisOptions)
|
c := redis.NewUniversalClient(redisOptions)
|
||||||
setTracing(c, b.opts.Tracer)
|
setTracing(c, b.opts.Tracer)
|
||||||
|
|
||||||
|
b.cli = c
|
||||||
b.statsMeter()
|
b.statsMeter()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -280,6 +279,7 @@ func (b *Broker) Connect(ctx context.Context) error {
|
|||||||
err = b.cli.Ping(ctx).Err()
|
err = b.cli.Ping(ctx).Err()
|
||||||
}
|
}
|
||||||
setSpanError(ctx, err)
|
setSpanError(ctx, err)
|
||||||
|
b.done = make(chan struct{})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -318,5 +318,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker(opts ...broker.Option) *Broker {
|
func NewBroker(opts ...broker.Option) *Broker {
|
||||||
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
|
return &Broker{opts: broker.NewOptions(opts...)}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user