338 lines
7.8 KiB
Go
Raw Permalink Normal View History

package redis
import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/redis/go-redis/v9"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/semconv"
)
var DefaultOptions = &redis.UniversalOptions{
Username: "",
Password: "", // no password set
DB: 0, // use default DB
MaxRetries: 2,
MaxRetryBackoff: 256 * time.Millisecond,
DialTimeout: 1 * time.Second,
ReadTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
PoolTimeout: 1 * time.Second,
MinIdleConns: 10,
}
var (
_ broker.Broker = (*Broker)(nil)
_ broker.Event = (*Event)(nil)
_ broker.Subscriber = (*Subscriber)(nil)
)
// Event is an broker.Event
type Event struct {
ctx context.Context
err error
msg *broker.Message
topic string
}
// Topic returns the topic this Event applies to.
func (p *Event) Context() context.Context {
return p.ctx
}
// Topic returns the topic this Event
func (p *Event) Topic() string {
return p.topic
}
// Message returns the broker message
func (p *Event) Message() *broker.Message {
return p.msg
}
// Ack sends an acknowledgement to the broker. However this is not supported
// is Redis and therefore this is a no-op.
func (p *Event) Ack() error {
return nil
}
func (p *Event) Error() error {
return p.err
}
func (p *Event) SetError(err error) {
p.err = err
}
// Subscriber implements broker.Subscriber interface
type Subscriber struct {
ctx context.Context
done chan struct{}
sub *redis.PubSub
topic string
handle broker.Handler
opts broker.Options
sopts broker.SubscribeOptions
}
// recv loops to receive new messages from Redis and handle them
// as Events.
func (s *Subscriber) loop() {
maxInflight := DefaultSubscribeMaxInflight
if s.opts.Context != nil {
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
maxInflight = n
}
}
eh := s.opts.ErrorHandler
if s.sopts.ErrorHandler != nil {
eh = s.sopts.ErrorHandler
}
for {
select {
case <-s.done:
return
case msg := <-s.sub.Channel(redis.WithChannelSize(maxInflight)):
p := &Event{
topic: msg.Channel,
msg: &broker.Message{},
}
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
if err != nil {
p.msg.Body = codec.RawMessage(msg.Payload)
if eh != nil {
_ = eh(p)
continue
}
s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("codec.Unmarshal error %v", err))
}
if p.err = s.handle(p); p.err != nil {
if eh != nil {
_ = eh(p)
continue
}
s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("handle error %v", err))
}
if s.sopts.AutoAck {
if err := p.Ack(); err != nil {
s.opts.Logger.Fatal(s.ctx, "auto ack error", err)
}
}
}
}
}
// Options returns the Subscriber options.
func (s *Subscriber) Options() broker.SubscribeOptions {
return s.sopts
}
// Topic returns the topic of the Subscriber.
func (s *Subscriber) Topic() string {
return s.topic
}
// Unsubscribe unsubscribes the Subscriber and frees the connection.
func (s *Subscriber) Unsubscribe(ctx context.Context) error {
return s.sub.Unsubscribe(ctx, s.topic)
}
// Broker implements broker.Broker interface
type Broker struct {
opts broker.Options
cli redis.UniversalClient
done chan struct{}
connected *atomic.Uint32
}
func (b *Broker) Live() bool {
return b.connected.Load() == 1
}
func (b *Broker) Ready() bool {
return b.connected.Load() == 1
}
func (b *Broker) Health() bool {
return b.connected.Load() == 1
}
// String returns the name of the broker implementation
func (b *Broker) String() string {
return "redis"
}
// Name returns the name of the broker
func (b *Broker) Name() string {
return b.opts.Name
}
// Options returns the broker.Broker Options
func (b *Broker) Options() broker.Options {
return b.opts
}
// Address returns the address the broker will use to create new connections
func (b *Broker) Address() string {
return strings.Join(b.opts.Addrs, ",")
}
func (b *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
return b.publish(ctx, msgs, opts...)
}
func (b *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
msg.Header.Set(metadata.HeaderTopic, topic)
return b.publish(ctx, []*broker.Message{msg}, opts...)
}
func (b *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
options := broker.NewPublishOptions(opts...)
for _, msg := range msgs {
var record string
topic, _ := msg.Header.Get(metadata.HeaderTopic)
msg.Header.Del(metadata.HeaderTopic)
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Inc()
if options.BodyOnly || b.opts.Codec.String() == "noop" {
record = string(msg.Body)
} else {
buf, err := b.opts.Codec.Marshal(msg)
if err != nil {
return err
}
record = string(buf)
}
ts := time.Now()
if err := b.cli.Publish(ctx, topic, record); err != nil {
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "failure").Inc()
} else {
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "success").Inc()
}
te := time.Since(ts)
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Dec()
}
return nil
}
// Subscribe returns a broker.BatchSubscriber for the topic and handler
func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
// Subscribe returns a broker.Subscriber for the topic and handler
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
s := &Subscriber{
ctx: ctx,
topic: topic,
handle: handler,
opts: b.opts,
sopts: broker.NewSubscribeOptions(opts...),
done: make(chan struct{}),
}
s.sub = b.cli.Subscribe(s.ctx, s.topic)
if err := s.sub.Ping(ctx, ""); err != nil {
return nil, err
}
go s.loop()
return s, nil
}
func (b *Broker) configure(opts ...broker.Option) error {
if b.connected.Load() == 1 && len(opts) == 0 {
return nil
}
redisOptions := DefaultOptions
if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
redisOptions = c
}
}
if len(b.opts.Addrs) > 0 {
redisOptions.Addrs = b.opts.Addrs
}
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig
}
c := redis.NewUniversalClient(redisOptions)
setTracing(c, b.opts.Tracer)
b.cli = c
b.statsMeter()
return nil
}
func (b *Broker) Connect(ctx context.Context) error {
if b.connected.Load() == 1 {
return nil
}
var err error
if b.cli != nil {
err = b.cli.Ping(ctx).Err()
}
setSpanError(ctx, err)
b.done = make(chan struct{})
return err
}
func (b *Broker) Init(opts ...broker.Option) error {
err := b.configure(opts...)
if err != nil {
return err
}
return nil
}
func (b *Broker) Client() redis.UniversalClient {
if b.cli != nil {
return b.cli
}
return nil
}
func (b *Broker) Disconnect(ctx context.Context) error {
if b.connected.Load() == 0 {
return nil
}
var err error
select {
case <-b.done:
return err
default:
if b.cli != nil {
err = b.cli.Close()
}
close(b.done)
return err
}
}
func NewBroker(opts ...broker.Option) *Broker {
return &Broker{connected: &atomic.Uint32{}, opts: broker.NewOptions(opts...)}
}