commit 9ddd01b5d76bacbd08f271b67f556fbd9ddf2e28 Author: Vasiliy Tolstov Date: Mon Sep 9 18:36:04 2024 +0300 initial import Signed-off-by: Vasiliy Tolstov diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..09f4136 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module go.unistack.org/micro-broker-redis/v3 + +go 1.23.1 + +require ( + github.com/redis/go-redis/v9 v9.6.1 + go.unistack.org/micro/v3 v3.10.84 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a330a1d --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4= +go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= diff --git a/options.go b/options.go new file mode 100644 index 0000000..275fa68 --- /dev/null +++ b/options.go @@ -0,0 +1,90 @@ +package redis + +import ( + "time" + + "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/logger" + "go.unistack.org/micro/v3/meter" + "go.unistack.org/micro/v3/store" + "go.unistack.org/micro/v3/tracer" +) + +type configKey struct{} + +func Config(c *redis.Options) store.Option { + return store.SetOption(configKey{}, c) +} + +type clusterConfigKey struct{} + +func ClusterConfig(c *redis.ClusterOptions) store.Option { + return store.SetOption(clusterConfigKey{}, c) +} + +var ( + // DefaultSubscribeMaxInflight specifies how much messages keep inflight + DefaultSubscribeMaxInflight = 100 + + // DefaultMeterStatsInterval holds default stats interval + DefaultMeterStatsInterval = 5 * time.Second + // DefaultMeterMetricPrefix holds default metric prefix + DefaultMeterMetricPrefix = "micro_broker_" +) + +// Options struct holds wrapper options +type Options struct { + Logger logger.Logger + Meter meter.Meter + Tracer tracer.Tracer + MeterMetricPrefix string + MeterStatsInterval time.Duration +} + +// Option func signature +type Option func(*Options) + +// NewOptions create new Options struct from provided option slice +func NewOptions(opts ...Option) Options { + options := Options{ + Logger: logger.DefaultLogger, + Meter: meter.DefaultMeter, + Tracer: tracer.DefaultTracer, + MeterStatsInterval: DefaultMeterStatsInterval, + MeterMetricPrefix: DefaultMeterMetricPrefix, + } + + for _, o := range opts { + o(&options) + } + + options.Meter = options.Meter.Clone( + meter.MetricPrefix(options.MeterMetricPrefix), + ) + + options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1)) + + return options +} + +// MetricInterval specifies stats interval for *sql.DB +func MetricInterval(td time.Duration) Option { + return func(o *Options) { + o.MeterStatsInterval = td + } +} + +// MetricPrefix specifies prefix for each metric +func MetricPrefix(pref string) Option { + return func(o *Options) { + o.MeterMetricPrefix = pref + } +} + +type subscribeMaxInflightKey struct{} + +// SubscribeMaxInFlight max queued messages +func SubscribeMaxInFlight(n int) broker.SubscribeOption { + return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n) +} diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..59fd37b --- /dev/null +++ b/redis.go @@ -0,0 +1,322 @@ +package redis + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/semconv" +) + +var DefaultOptions = &redis.UniversalOptions{ + Username: "", + Password: "", // no password set + DB: 0, // use default DB + MaxRetries: 2, + MaxRetryBackoff: 256 * time.Millisecond, + DialTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + PoolTimeout: 1 * time.Second, + MinIdleConns: 10, +} + +var ( + _ broker.Broker = (*Broker)(nil) + _ broker.Event = (*Event)(nil) + _ broker.Subscriber = (*Subscriber)(nil) +) + +// Event is an broker.Event +type Event struct { + ctx context.Context + topic string + msg *broker.Message + err error +} + +// Topic returns the topic this Event applies to. +func (p *Event) Context() context.Context { + return p.ctx +} + +// Topic returns the topic this Event +func (p *Event) Topic() string { + return p.topic +} + +// Message returns the broker message +func (p *Event) Message() *broker.Message { + return p.msg +} + +// Ack sends an acknowledgement to the broker. However this is not supported +// is Redis and therefore this is a no-op. +func (p *Event) Ack() error { + return nil +} + +func (p *Event) Error() error { + return p.err +} + +func (p *Event) SetError(err error) { + p.err = err +} + +// Subscriber implements broker.Subscriber interface +type Subscriber struct { + ctx context.Context + done chan struct{} + sub *redis.PubSub + topic string + handle broker.Handler + opts broker.Options + sopts broker.SubscribeOptions +} + +// recv loops to receive new messages from Redis and handle them +// as Events. +func (s *Subscriber) loop() { + maxInflight := DefaultSubscribeMaxInflight + if s.opts.Context != nil { + if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok { + maxInflight = n + } + } + + eh := s.opts.ErrorHandler + if s.sopts.ErrorHandler != nil { + eh = s.sopts.ErrorHandler + } + + for { + select { + case <-s.done: + return + case msg := <-s.sub.Channel(redis.WithChannelSize(maxInflight)): + p := &Event{ + topic: msg.Channel, + msg: &broker.Message{}, + } + + err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg) + if err != nil { + p.msg.Body = codec.RawMessage(msg.Payload) + if eh != nil { + _ = eh(p) + continue + } + s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("codec.Unmarshal error %v", err)) + } + + if p.err = s.handle(p); p.err != nil { + if eh != nil { + _ = eh(p) + continue + } + s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("handle error %v", err)) + + } + + if s.sopts.AutoAck { + if err := p.Ack(); err != nil { + s.opts.Logger.Fatal(s.ctx, "auto ack error", err) + } + } + } + } +} + +// Options returns the Subscriber options. +func (s *Subscriber) Options() broker.SubscribeOptions { + return s.sopts +} + +// Topic returns the topic of the Subscriber. +func (s *Subscriber) Topic() string { + return s.topic +} + +// Unsubscribe unsubscribes the Subscriber and frees the connection. +func (s *Subscriber) Unsubscribe(ctx context.Context) error { + return s.sub.Unsubscribe(ctx, s.topic) +} + +// Broker implements broker.Broker interface +type Broker struct { + opts broker.Options + cli redis.UniversalClient + done chan struct{} +} + +// String returns the name of the broker implementation +func (b *Broker) String() string { + return "redis" +} + +// Name returns the name of the broker +func (b *Broker) Name() string { + return b.opts.Name +} + +// Options returns the broker.Broker Options +func (b *Broker) Options() broker.Options { + return b.opts +} + +// Address returns the address the broker will use to create new connections +func (b *Broker) Address() string { + return strings.Join(b.opts.Addrs, ",") +} + +func (b *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + return b.publish(ctx, msgs, opts...) +} + +func (b *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error { + msg.Header.Set(metadata.HeaderTopic, topic) + return b.publish(ctx, []*broker.Message{msg}, opts...) +} + +func (b *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error { + options := broker.NewPublishOptions(opts...) + + for _, msg := range msgs { + var record string + topic, _ := msg.Header.Get(metadata.HeaderTopic) + msg.Header.Del(metadata.HeaderTopic) + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Inc() + if options.BodyOnly || b.opts.Codec.String() == "noop" { + record = string(msg.Body) + } else { + buf, err := b.opts.Codec.Marshal(msg) + if err != nil { + return err + } + record = string(buf) + } + ts := time.Now() + if err := b.cli.Publish(ctx, topic, record); err != nil { + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "failure").Inc() + } else { + b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "success").Inc() + } + te := time.Since(ts) + b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", topic, "topic", topic).Update(te.Seconds()) + b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", topic, "topic", topic).Update(te.Seconds()) + b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Dec() + } + return nil +} + +// Subscribe returns a broker.BatchSubscriber for the topic and handler +func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + return nil, nil +} + +// Subscribe returns a broker.Subscriber for the topic and handler +func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + s := &Subscriber{ + topic: topic, + handle: handler, + opts: b.opts, + sopts: broker.NewSubscribeOptions(opts...), + done: make(chan struct{}), + } + + // Run the receiver routine. + go s.loop() + + s.sub = b.cli.Subscribe(s.ctx, s.topic) + if err := s.sub.Ping(ctx, ""); err != nil { + return nil, err + } + + return s, nil +} + +func (b *Broker) configure() error { + redisOptions := DefaultOptions + + if b.cli != nil && b.opts.Context == nil { + return nil + } + + if b.opts.Context != nil { + if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok { + redisOptions = c + if b.opts.TLSConfig != nil { + redisOptions.TLSConfig = b.opts.TLSConfig + } + } + } + + if redisOptions == nil && b.cli != nil { + return nil + } + + if redisOptions == nil { + redisOptions.Addrs = b.opts.Addrs + redisOptions.TLSConfig = b.opts.TLSConfig + } + + c := redis.NewUniversalClient(redisOptions) + setTracing(c, b.opts.Tracer) + + b.statsMeter() + + return nil +} + +func (b *Broker) Connect(ctx context.Context) error { + var err error + if b.cli != nil { + err = b.cli.Ping(ctx).Err() + } + setSpanError(ctx, err) + return err +} + +func (b *Broker) Init(opts ...broker.Option) error { + for _, o := range opts { + o(&b.opts) + } + + err := b.configure() + if err != nil { + return err + } + + return nil +} + +func (b *Broker) Client() redis.UniversalClient { + if b.cli != nil { + return b.cli + } + return nil +} + +func (b *Broker) Disconnect(ctx context.Context) error { + var err error + select { + case <-b.done: + return err + default: + if b.cli != nil { + err = b.cli.Close() + } + close(b.done) + return err + } +} + +func NewBroker(opts ...broker.Option) *Broker { + return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)} +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..8289422 --- /dev/null +++ b/stats.go @@ -0,0 +1,49 @@ +package redis + +import ( + "time" + + "github.com/redis/go-redis/v9" +) + +var ( + PoolHitsTotal = "pool_hits_total" + PoolMissesTotal = "pool_misses_total" + PoolTimeoutTotal = "pool_timeout_total" + PoolConnTotalCurrent = "pool_conn_total_current" + PoolConnIdleCurrent = "pool_conn_idle_current" + PoolConnStaleTotal = "pool_conn_stale_total" + + meterRequestTotal = "request_total" + meterRequestLatencyMicroseconds = "latency_microseconds" + meterRequestDurationSeconds = "request_duration_seconds" +) + +type Statser interface { + PoolStats() *redis.PoolStats +} + +func (b *Broker) statsMeter() { + go func() { + ticker := time.NewTicker(DefaultMeterStatsInterval) + defer ticker.Stop() + + for { + select { + case <-b.done: + return + case <-ticker.C: + if b.cli == nil { + return + } + stats := b.cli.PoolStats() + b.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits)) + b.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses)) + b.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts)) + b.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns)) + b.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns)) + b.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns)) + } + } + }() +} diff --git a/tracer.go b/tracer.go new file mode 100644 index 0000000..bddbcaa --- /dev/null +++ b/tracer.go @@ -0,0 +1,128 @@ +package redis + +import ( + "context" + "fmt" + "net" + "strconv" + + rediscmd "github.com/redis/go-redis/extra/rediscmd/v9" + "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/tracer" +) + +func setTracing(rdb redis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) { + switch rdb := rdb.(type) { + case *redis.Client: + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + case *redis.ClusterClient: + rdb.OnNewNode(func(rdb *redis.Client) { + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + }) + case *redis.Ring: + rdb.OnNewNode(func(rdb *redis.Client) { + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + }) + } +} + +type tracingHook struct { + tr tracer.Tracer + opts []tracer.SpanOption +} + +var _ redis.Hook = (*tracingHook)(nil) + +func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook { + opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient)) + if connString != "" { + opts = append(opts, tracer.WithSpanLabels("db.connection_string", connString)) + } + + return &tracingHook{ + tr: tr, + opts: opts, + } +} + +func (h *tracingHook) DialHook(hook redis.DialHook) redis.DialHook { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + /* + _, span := h.tr.Start(ctx, "redis.dial", h.opts...) + defer span.Finish() + */ + conn, err := hook(ctx, network, addr) + // recordError(span, err) + + return conn, err + } +} + +func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook { + return func(ctx context.Context, cmd redis.Cmder) error { + cmdString := rediscmd.CmdString(cmd) + var err error + + switch cmdString { + case "cluster slots": + break + default: + _, span := h.tr.Start(ctx, "redis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) + defer func() { + recordError(span, err) + span.Finish() + }() + } + + err = hook(ctx, cmd) + + return err + } +} + +func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook { + return func(ctx context.Context, cmds []redis.Cmder) error { + _, cmdsString := rediscmd.CmdsString(cmds) + + opts := append(h.opts, tracer.WithSpanLabels( + "db.redis.num_cmd", strconv.Itoa(len(cmds)), + "db.statement", cmdsString, + )) + + _, span := h.tr.Start(ctx, "redis.process_pipeline", opts...) + defer span.Finish() + + err := hook(ctx, cmds) + recordError(span, err) + + return err + } +} + +func setSpanError(ctx context.Context, err error) { + if err == nil || err == redis.Nil { + return + } + if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } +} + +func recordError(span tracer.Span, err error) { + if err != nil && err != redis.Nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } +} + +func formatDBConnString(network, addr string) string { + if network == "tcp" { + network = "redis" + } + return fmt.Sprintf("%s://%s", network, addr) +}