initial import

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-09-09 18:36:04 +03:00
commit 9ddd01b5d7
6 changed files with 614 additions and 0 deletions

13
go.mod Normal file
View File

@ -0,0 +1,13 @@
module go.unistack.org/micro-broker-redis/v3
go 1.23.1
require (
github.com/redis/go-redis/v9 v9.6.1
go.unistack.org/micro/v3 v3.10.84
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)

12
go.sum Normal file
View File

@ -0,0 +1,12 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4=
go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=

90
options.go Normal file
View File

@ -0,0 +1,90 @@
package redis
import (
"time"
"github.com/redis/go-redis/v9"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/store"
"go.unistack.org/micro/v3/tracer"
)
type configKey struct{}
func Config(c *redis.Options) store.Option {
return store.SetOption(configKey{}, c)
}
type clusterConfigKey struct{}
func ClusterConfig(c *redis.ClusterOptions) store.Option {
return store.SetOption(clusterConfigKey{}, c)
}
var (
// DefaultSubscribeMaxInflight specifies how much messages keep inflight
DefaultSubscribeMaxInflight = 100
// DefaultMeterStatsInterval holds default stats interval
DefaultMeterStatsInterval = 5 * time.Second
// DefaultMeterMetricPrefix holds default metric prefix
DefaultMeterMetricPrefix = "micro_broker_"
)
// Options struct holds wrapper options
type Options struct {
Logger logger.Logger
Meter meter.Meter
Tracer tracer.Tracer
MeterMetricPrefix string
MeterStatsInterval time.Duration
}
// Option func signature
type Option func(*Options)
// NewOptions create new Options struct from provided option slice
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
MeterStatsInterval: DefaultMeterStatsInterval,
MeterMetricPrefix: DefaultMeterMetricPrefix,
}
for _, o := range opts {
o(&options)
}
options.Meter = options.Meter.Clone(
meter.MetricPrefix(options.MeterMetricPrefix),
)
options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
return options
}
// MetricInterval specifies stats interval for *sql.DB
func MetricInterval(td time.Duration) Option {
return func(o *Options) {
o.MeterStatsInterval = td
}
}
// MetricPrefix specifies prefix for each metric
func MetricPrefix(pref string) Option {
return func(o *Options) {
o.MeterMetricPrefix = pref
}
}
type subscribeMaxInflightKey struct{}
// SubscribeMaxInFlight max queued messages
func SubscribeMaxInFlight(n int) broker.SubscribeOption {
return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n)
}

322
redis.go Normal file
View File

@ -0,0 +1,322 @@
package redis
import (
"context"
"fmt"
"strings"
"time"
"github.com/redis/go-redis/v9"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/semconv"
)
var DefaultOptions = &redis.UniversalOptions{
Username: "",
Password: "", // no password set
DB: 0, // use default DB
MaxRetries: 2,
MaxRetryBackoff: 256 * time.Millisecond,
DialTimeout: 1 * time.Second,
ReadTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
PoolTimeout: 1 * time.Second,
MinIdleConns: 10,
}
var (
_ broker.Broker = (*Broker)(nil)
_ broker.Event = (*Event)(nil)
_ broker.Subscriber = (*Subscriber)(nil)
)
// Event is an broker.Event
type Event struct {
ctx context.Context
topic string
msg *broker.Message
err error
}
// Topic returns the topic this Event applies to.
func (p *Event) Context() context.Context {
return p.ctx
}
// Topic returns the topic this Event
func (p *Event) Topic() string {
return p.topic
}
// Message returns the broker message
func (p *Event) Message() *broker.Message {
return p.msg
}
// Ack sends an acknowledgement to the broker. However this is not supported
// is Redis and therefore this is a no-op.
func (p *Event) Ack() error {
return nil
}
func (p *Event) Error() error {
return p.err
}
func (p *Event) SetError(err error) {
p.err = err
}
// Subscriber implements broker.Subscriber interface
type Subscriber struct {
ctx context.Context
done chan struct{}
sub *redis.PubSub
topic string
handle broker.Handler
opts broker.Options
sopts broker.SubscribeOptions
}
// recv loops to receive new messages from Redis and handle them
// as Events.
func (s *Subscriber) loop() {
maxInflight := DefaultSubscribeMaxInflight
if s.opts.Context != nil {
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
maxInflight = n
}
}
eh := s.opts.ErrorHandler
if s.sopts.ErrorHandler != nil {
eh = s.sopts.ErrorHandler
}
for {
select {
case <-s.done:
return
case msg := <-s.sub.Channel(redis.WithChannelSize(maxInflight)):
p := &Event{
topic: msg.Channel,
msg: &broker.Message{},
}
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
if err != nil {
p.msg.Body = codec.RawMessage(msg.Payload)
if eh != nil {
_ = eh(p)
continue
}
s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("codec.Unmarshal error %v", err))
}
if p.err = s.handle(p); p.err != nil {
if eh != nil {
_ = eh(p)
continue
}
s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("handle error %v", err))
}
if s.sopts.AutoAck {
if err := p.Ack(); err != nil {
s.opts.Logger.Fatal(s.ctx, "auto ack error", err)
}
}
}
}
}
// Options returns the Subscriber options.
func (s *Subscriber) Options() broker.SubscribeOptions {
return s.sopts
}
// Topic returns the topic of the Subscriber.
func (s *Subscriber) Topic() string {
return s.topic
}
// Unsubscribe unsubscribes the Subscriber and frees the connection.
func (s *Subscriber) Unsubscribe(ctx context.Context) error {
return s.sub.Unsubscribe(ctx, s.topic)
}
// Broker implements broker.Broker interface
type Broker struct {
opts broker.Options
cli redis.UniversalClient
done chan struct{}
}
// String returns the name of the broker implementation
func (b *Broker) String() string {
return "redis"
}
// Name returns the name of the broker
func (b *Broker) Name() string {
return b.opts.Name
}
// Options returns the broker.Broker Options
func (b *Broker) Options() broker.Options {
return b.opts
}
// Address returns the address the broker will use to create new connections
func (b *Broker) Address() string {
return strings.Join(b.opts.Addrs, ",")
}
func (b *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
return b.publish(ctx, msgs, opts...)
}
func (b *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
msg.Header.Set(metadata.HeaderTopic, topic)
return b.publish(ctx, []*broker.Message{msg}, opts...)
}
func (b *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
options := broker.NewPublishOptions(opts...)
for _, msg := range msgs {
var record string
topic, _ := msg.Header.Get(metadata.HeaderTopic)
msg.Header.Del(metadata.HeaderTopic)
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Inc()
if options.BodyOnly || b.opts.Codec.String() == "noop" {
record = string(msg.Body)
} else {
buf, err := b.opts.Codec.Marshal(msg)
if err != nil {
return err
}
record = string(buf)
}
ts := time.Now()
if err := b.cli.Publish(ctx, topic, record); err != nil {
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "failure").Inc()
} else {
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "success").Inc()
}
te := time.Since(ts)
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Dec()
}
return nil
}
// Subscribe returns a broker.BatchSubscriber for the topic and handler
func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
return nil, nil
}
// Subscribe returns a broker.Subscriber for the topic and handler
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
s := &Subscriber{
topic: topic,
handle: handler,
opts: b.opts,
sopts: broker.NewSubscribeOptions(opts...),
done: make(chan struct{}),
}
// Run the receiver routine.
go s.loop()
s.sub = b.cli.Subscribe(s.ctx, s.topic)
if err := s.sub.Ping(ctx, ""); err != nil {
return nil, err
}
return s, nil
}
func (b *Broker) configure() error {
redisOptions := DefaultOptions
if b.cli != nil && b.opts.Context == nil {
return nil
}
if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
redisOptions = c
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig
}
}
}
if redisOptions == nil && b.cli != nil {
return nil
}
if redisOptions == nil {
redisOptions.Addrs = b.opts.Addrs
redisOptions.TLSConfig = b.opts.TLSConfig
}
c := redis.NewUniversalClient(redisOptions)
setTracing(c, b.opts.Tracer)
b.statsMeter()
return nil
}
func (b *Broker) Connect(ctx context.Context) error {
var err error
if b.cli != nil {
err = b.cli.Ping(ctx).Err()
}
setSpanError(ctx, err)
return err
}
func (b *Broker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&b.opts)
}
err := b.configure()
if err != nil {
return err
}
return nil
}
func (b *Broker) Client() redis.UniversalClient {
if b.cli != nil {
return b.cli
}
return nil
}
func (b *Broker) Disconnect(ctx context.Context) error {
var err error
select {
case <-b.done:
return err
default:
if b.cli != nil {
err = b.cli.Close()
}
close(b.done)
return err
}
}
func NewBroker(opts ...broker.Option) *Broker {
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
}

49
stats.go Normal file
View File

@ -0,0 +1,49 @@
package redis
import (
"time"
"github.com/redis/go-redis/v9"
)
var (
PoolHitsTotal = "pool_hits_total"
PoolMissesTotal = "pool_misses_total"
PoolTimeoutTotal = "pool_timeout_total"
PoolConnTotalCurrent = "pool_conn_total_current"
PoolConnIdleCurrent = "pool_conn_idle_current"
PoolConnStaleTotal = "pool_conn_stale_total"
meterRequestTotal = "request_total"
meterRequestLatencyMicroseconds = "latency_microseconds"
meterRequestDurationSeconds = "request_duration_seconds"
)
type Statser interface {
PoolStats() *redis.PoolStats
}
func (b *Broker) statsMeter() {
go func() {
ticker := time.NewTicker(DefaultMeterStatsInterval)
defer ticker.Stop()
for {
select {
case <-b.done:
return
case <-ticker.C:
if b.cli == nil {
return
}
stats := b.cli.PoolStats()
b.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits))
b.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses))
b.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts))
b.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns))
b.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns))
b.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns))
}
}
}()
}

128
tracer.go Normal file
View File

@ -0,0 +1,128 @@
package redis
import (
"context"
"fmt"
"net"
"strconv"
rediscmd "github.com/redis/go-redis/extra/rediscmd/v9"
"github.com/redis/go-redis/v9"
"go.unistack.org/micro/v3/tracer"
)
func setTracing(rdb redis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) {
switch rdb := rdb.(type) {
case *redis.Client:
opt := rdb.Options()
connString := formatDBConnString(opt.Network, opt.Addr)
rdb.AddHook(newTracingHook(connString, tr))
case *redis.ClusterClient:
rdb.OnNewNode(func(rdb *redis.Client) {
opt := rdb.Options()
connString := formatDBConnString(opt.Network, opt.Addr)
rdb.AddHook(newTracingHook(connString, tr))
})
case *redis.Ring:
rdb.OnNewNode(func(rdb *redis.Client) {
opt := rdb.Options()
connString := formatDBConnString(opt.Network, opt.Addr)
rdb.AddHook(newTracingHook(connString, tr))
})
}
}
type tracingHook struct {
tr tracer.Tracer
opts []tracer.SpanOption
}
var _ redis.Hook = (*tracingHook)(nil)
func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook {
opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient))
if connString != "" {
opts = append(opts, tracer.WithSpanLabels("db.connection_string", connString))
}
return &tracingHook{
tr: tr,
opts: opts,
}
}
func (h *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
/*
_, span := h.tr.Start(ctx, "redis.dial", h.opts...)
defer span.Finish()
*/
conn, err := hook(ctx, network, addr)
// recordError(span, err)
return conn, err
}
}
func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
cmdString := rediscmd.CmdString(cmd)
var err error
switch cmdString {
case "cluster slots":
break
default:
_, span := h.tr.Start(ctx, "redis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...)
defer func() {
recordError(span, err)
span.Finish()
}()
}
err = hook(ctx, cmd)
return err
}
}
func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
_, cmdsString := rediscmd.CmdsString(cmds)
opts := append(h.opts, tracer.WithSpanLabels(
"db.redis.num_cmd", strconv.Itoa(len(cmds)),
"db.statement", cmdsString,
))
_, span := h.tr.Start(ctx, "redis.process_pipeline", opts...)
defer span.Finish()
err := hook(ctx, cmds)
recordError(span, err)
return err
}
}
func setSpanError(ctx context.Context, err error) {
if err == nil || err == redis.Nil {
return
}
if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
}
func recordError(span tracer.Span, err error) {
if err != nil && err != redis.Nil {
span.SetStatus(tracer.SpanStatusError, err.Error())
}
}
func formatDBConnString(network, addr string) string {
if network == "tcp" {
network = "redis"
}
return fmt.Sprintf("%s://%s", network, addr)
}