initial import
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
commit
9ee0a41ddb
13
go.mod
Normal file
13
go.mod
Normal file
@ -0,0 +1,13 @@
|
||||
module go.unistack.org/micro-broker-redis/v3
|
||||
|
||||
go 1.23.1
|
||||
|
||||
require (
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
go.unistack.org/micro/v3 v3.10.84
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
)
|
12
go.sum
Normal file
12
go.sum
Normal file
@ -0,0 +1,12 @@
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
|
||||
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
|
||||
go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4=
|
||||
go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
|
90
options.go
Normal file
90
options.go
Normal file
@ -0,0 +1,90 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
"go.unistack.org/micro/v3/store"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
type configKey struct{}
|
||||
|
||||
func Config(c *redis.Options) store.Option {
|
||||
return store.SetOption(configKey{}, c)
|
||||
}
|
||||
|
||||
type clusterConfigKey struct{}
|
||||
|
||||
func ClusterConfig(c *redis.ClusterOptions) store.Option {
|
||||
return store.SetOption(clusterConfigKey{}, c)
|
||||
}
|
||||
|
||||
var (
|
||||
// DefaultSubscribeMaxInflight specifies how much messages keep inflight
|
||||
DefaultSubscribeMaxInflight = 100
|
||||
|
||||
// DefaultMeterStatsInterval holds default stats interval
|
||||
DefaultMeterStatsInterval = 5 * time.Second
|
||||
// DefaultMeterMetricPrefix holds default metric prefix
|
||||
DefaultMeterMetricPrefix = "micro_broker_"
|
||||
)
|
||||
|
||||
// Options struct holds wrapper options
|
||||
type Options struct {
|
||||
Logger logger.Logger
|
||||
Meter meter.Meter
|
||||
Tracer tracer.Tracer
|
||||
MeterMetricPrefix string
|
||||
MeterStatsInterval time.Duration
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
type Option func(*Options)
|
||||
|
||||
// NewOptions create new Options struct from provided option slice
|
||||
func NewOptions(opts ...Option) Options {
|
||||
options := Options{
|
||||
Logger: logger.DefaultLogger,
|
||||
Meter: meter.DefaultMeter,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
MeterStatsInterval: DefaultMeterStatsInterval,
|
||||
MeterMetricPrefix: DefaultMeterMetricPrefix,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
options.Meter = options.Meter.Clone(
|
||||
meter.MetricPrefix(options.MeterMetricPrefix),
|
||||
)
|
||||
|
||||
options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
// MetricInterval specifies stats interval for *sql.DB
|
||||
func MetricInterval(td time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.MeterStatsInterval = td
|
||||
}
|
||||
}
|
||||
|
||||
// MetricPrefix specifies prefix for each metric
|
||||
func MetricPrefix(pref string) Option {
|
||||
return func(o *Options) {
|
||||
o.MeterMetricPrefix = pref
|
||||
}
|
||||
}
|
||||
|
||||
type subscribeMaxInflightKey struct{}
|
||||
|
||||
// SubscribeMaxInFlight max queued messages
|
||||
func SubscribeMaxInFlight(n int) broker.SubscribeOption {
|
||||
return broker.SetSubscribeOption(subscribeMaxInflightKey{}, n)
|
||||
}
|
322
redis.go
Normal file
322
redis.go
Normal file
@ -0,0 +1,322 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/semconv"
|
||||
)
|
||||
|
||||
var DefaultOptions = &redis.UniversalOptions{
|
||||
Username: "",
|
||||
Password: "", // no password set
|
||||
DB: 0, // use default DB
|
||||
MaxRetries: 2,
|
||||
MaxRetryBackoff: 256 * time.Millisecond,
|
||||
DialTimeout: 1 * time.Second,
|
||||
ReadTimeout: 1 * time.Second,
|
||||
WriteTimeout: 1 * time.Second,
|
||||
PoolTimeout: 1 * time.Second,
|
||||
MinIdleConns: 10,
|
||||
}
|
||||
|
||||
var (
|
||||
_ broker.Broker = (*Broker)(nil)
|
||||
_ broker.Event = (*Event)(nil)
|
||||
_ broker.Subscriber = (*Subscriber)(nil)
|
||||
)
|
||||
|
||||
// Event is an broker.Event
|
||||
type Event struct {
|
||||
ctx context.Context
|
||||
topic string
|
||||
msg *broker.Message
|
||||
err error
|
||||
}
|
||||
|
||||
// Topic returns the topic this Event applies to.
|
||||
func (p *Event) Context() context.Context {
|
||||
return p.ctx
|
||||
}
|
||||
|
||||
// Topic returns the topic this Event
|
||||
func (p *Event) Topic() string {
|
||||
return p.topic
|
||||
}
|
||||
|
||||
// Message returns the broker message
|
||||
func (p *Event) Message() *broker.Message {
|
||||
return p.msg
|
||||
}
|
||||
|
||||
// Ack sends an acknowledgement to the broker. However this is not supported
|
||||
// is Redis and therefore this is a no-op.
|
||||
func (p *Event) Ack() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Event) Error() error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
func (p *Event) SetError(err error) {
|
||||
p.err = err
|
||||
}
|
||||
|
||||
// Subscriber implements broker.Subscriber interface
|
||||
type Subscriber struct {
|
||||
ctx context.Context
|
||||
done chan struct{}
|
||||
sub *redis.PubSub
|
||||
topic string
|
||||
handle broker.Handler
|
||||
opts broker.Options
|
||||
sopts broker.SubscribeOptions
|
||||
}
|
||||
|
||||
// recv loops to receive new messages from Redis and handle them
|
||||
// as Events.
|
||||
func (s *Subscriber) loop() {
|
||||
maxInflight := DefaultSubscribeMaxInflight
|
||||
if s.opts.Context != nil {
|
||||
if n, ok := s.opts.Context.Value(subscribeMaxInflightKey{}).(int); n > 0 && ok {
|
||||
maxInflight = n
|
||||
}
|
||||
}
|
||||
|
||||
eh := s.opts.ErrorHandler
|
||||
if s.sopts.ErrorHandler != nil {
|
||||
eh = s.sopts.ErrorHandler
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
return
|
||||
case msg := <-s.sub.Channel(redis.WithChannelSize(maxInflight)):
|
||||
p := &Event{
|
||||
topic: msg.Channel,
|
||||
msg: &broker.Message{},
|
||||
}
|
||||
|
||||
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
||||
if err != nil {
|
||||
p.msg.Body = codec.RawMessage(msg.Payload)
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
continue
|
||||
}
|
||||
s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("codec.Unmarshal error %v", err))
|
||||
}
|
||||
|
||||
if p.err = s.handle(p); p.err != nil {
|
||||
if eh != nil {
|
||||
_ = eh(p)
|
||||
continue
|
||||
}
|
||||
s.opts.Logger.Fatal(s.ctx, fmt.Sprintf("handle error %v", err))
|
||||
|
||||
}
|
||||
|
||||
if s.sopts.AutoAck {
|
||||
if err := p.Ack(); err != nil {
|
||||
s.opts.Logger.Fatal(s.ctx, "auto ack error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Options returns the Subscriber options.
|
||||
func (s *Subscriber) Options() broker.SubscribeOptions {
|
||||
return s.sopts
|
||||
}
|
||||
|
||||
// Topic returns the topic of the Subscriber.
|
||||
func (s *Subscriber) Topic() string {
|
||||
return s.topic
|
||||
}
|
||||
|
||||
// Unsubscribe unsubscribes the Subscriber and frees the connection.
|
||||
func (s *Subscriber) Unsubscribe(ctx context.Context) error {
|
||||
return s.sub.Unsubscribe(ctx, s.topic)
|
||||
}
|
||||
|
||||
// Broker implements broker.Broker interface
|
||||
type Broker struct {
|
||||
opts broker.Options
|
||||
cli redis.UniversalClient
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// String returns the name of the broker implementation
|
||||
func (b *Broker) String() string {
|
||||
return "redis"
|
||||
}
|
||||
|
||||
// Name returns the name of the broker
|
||||
func (b *Broker) Name() string {
|
||||
return b.opts.Name
|
||||
}
|
||||
|
||||
// Options returns the broker.Broker Options
|
||||
func (b *Broker) Options() broker.Options {
|
||||
return b.opts
|
||||
}
|
||||
|
||||
// Address returns the address the broker will use to create new connections
|
||||
func (b *Broker) Address() string {
|
||||
return strings.Join(b.opts.Addrs, ",")
|
||||
}
|
||||
|
||||
func (b *Broker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
return b.publish(ctx, msgs, opts...)
|
||||
}
|
||||
|
||||
func (b *Broker) Publish(ctx context.Context, topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
||||
msg.Header.Set(metadata.HeaderTopic, topic)
|
||||
return b.publish(ctx, []*broker.Message{msg}, opts...)
|
||||
}
|
||||
|
||||
func (b *Broker) publish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
|
||||
options := broker.NewPublishOptions(opts...)
|
||||
|
||||
for _, msg := range msgs {
|
||||
var record string
|
||||
topic, _ := msg.Header.Get(metadata.HeaderTopic)
|
||||
msg.Header.Del(metadata.HeaderTopic)
|
||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Inc()
|
||||
if options.BodyOnly || b.opts.Codec.String() == "noop" {
|
||||
record = string(msg.Body)
|
||||
} else {
|
||||
buf, err := b.opts.Codec.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
record = string(buf)
|
||||
}
|
||||
ts := time.Now()
|
||||
if err := b.cli.Publish(ctx, topic, record); err != nil {
|
||||
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "failure").Inc()
|
||||
} else {
|
||||
b.opts.Meter.Counter(semconv.PublishMessageTotal, "endpoint", topic, "topic", topic, "status", "success").Inc()
|
||||
}
|
||||
te := time.Since(ts)
|
||||
b.opts.Meter.Summary(semconv.PublishMessageLatencyMicroseconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
|
||||
b.opts.Meter.Histogram(semconv.PublishMessageDurationSeconds, "endpoint", topic, "topic", topic).Update(te.Seconds())
|
||||
b.opts.Meter.Counter(semconv.PublishMessageInflight, "endpoint", topic, "topic", topic).Dec()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe returns a broker.BatchSubscriber for the topic and handler
|
||||
func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Subscribe returns a broker.Subscriber for the topic and handler
|
||||
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
s := &Subscriber{
|
||||
topic: topic,
|
||||
handle: handler,
|
||||
opts: b.opts,
|
||||
sopts: broker.NewSubscribeOptions(opts...),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Run the receiver routine.
|
||||
go s.loop()
|
||||
|
||||
s.sub = b.cli.Subscribe(s.ctx, s.topic)
|
||||
if err := s.sub.Ping(ctx, ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (b *Broker) configure() error {
|
||||
redisOptions := DefaultOptions
|
||||
|
||||
if b.cli != nil && b.opts.Context == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if b.opts.Context != nil {
|
||||
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
|
||||
redisOptions = c
|
||||
if b.opts.TLSConfig != nil {
|
||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if redisOptions == nil && b.cli != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if redisOptions == nil {
|
||||
redisOptions.Addrs = b.opts.Addrs
|
||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||
}
|
||||
|
||||
c := redis.NewUniversalClient(redisOptions)
|
||||
setTracing(c, b.opts.Tracer)
|
||||
|
||||
b.statsMeter()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Broker) Connect(ctx context.Context) error {
|
||||
var err error
|
||||
if b.cli != nil {
|
||||
err = b.cli.Ping(ctx).Err()
|
||||
}
|
||||
setSpanError(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *Broker) Init(opts ...broker.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&b.opts)
|
||||
}
|
||||
|
||||
err := b.configure()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Broker) Client() redis.UniversalClient {
|
||||
if b.cli != nil {
|
||||
return b.cli
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Broker) Disconnect(ctx context.Context) error {
|
||||
var err error
|
||||
select {
|
||||
case <-b.done:
|
||||
return err
|
||||
default:
|
||||
if b.cli != nil {
|
||||
err = b.cli.Close()
|
||||
}
|
||||
close(b.done)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func NewBroker(opts ...broker.Option) *Broker {
|
||||
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
|
||||
}
|
49
stats.go
Normal file
49
stats.go
Normal file
@ -0,0 +1,49 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var (
|
||||
PoolHitsTotal = "pool_hits_total"
|
||||
PoolMissesTotal = "pool_misses_total"
|
||||
PoolTimeoutTotal = "pool_timeout_total"
|
||||
PoolConnTotalCurrent = "pool_conn_total_current"
|
||||
PoolConnIdleCurrent = "pool_conn_idle_current"
|
||||
PoolConnStaleTotal = "pool_conn_stale_total"
|
||||
|
||||
meterRequestTotal = "request_total"
|
||||
meterRequestLatencyMicroseconds = "latency_microseconds"
|
||||
meterRequestDurationSeconds = "request_duration_seconds"
|
||||
)
|
||||
|
||||
type Statser interface {
|
||||
PoolStats() *redis.PoolStats
|
||||
}
|
||||
|
||||
func (b *Broker) statsMeter() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(DefaultMeterStatsInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-b.done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if b.cli == nil {
|
||||
return
|
||||
}
|
||||
stats := b.cli.PoolStats()
|
||||
b.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits))
|
||||
b.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses))
|
||||
b.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts))
|
||||
b.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns))
|
||||
b.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns))
|
||||
b.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
128
tracer.go
Normal file
128
tracer.go
Normal file
@ -0,0 +1,128 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
||||
rediscmd "github.com/redis/go-redis/extra/rediscmd/v9"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
func setTracing(rdb redis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) {
|
||||
switch rdb := rdb.(type) {
|
||||
case *redis.Client:
|
||||
opt := rdb.Options()
|
||||
connString := formatDBConnString(opt.Network, opt.Addr)
|
||||
rdb.AddHook(newTracingHook(connString, tr))
|
||||
case *redis.ClusterClient:
|
||||
rdb.OnNewNode(func(rdb *redis.Client) {
|
||||
opt := rdb.Options()
|
||||
connString := formatDBConnString(opt.Network, opt.Addr)
|
||||
rdb.AddHook(newTracingHook(connString, tr))
|
||||
})
|
||||
case *redis.Ring:
|
||||
rdb.OnNewNode(func(rdb *redis.Client) {
|
||||
opt := rdb.Options()
|
||||
connString := formatDBConnString(opt.Network, opt.Addr)
|
||||
rdb.AddHook(newTracingHook(connString, tr))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type tracingHook struct {
|
||||
tr tracer.Tracer
|
||||
opts []tracer.SpanOption
|
||||
}
|
||||
|
||||
var _ redis.Hook = (*tracingHook)(nil)
|
||||
|
||||
func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook {
|
||||
opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient))
|
||||
if connString != "" {
|
||||
opts = append(opts, tracer.WithSpanLabels("db.connection_string", connString))
|
||||
}
|
||||
|
||||
return &tracingHook{
|
||||
tr: tr,
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
|
||||
return func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
/*
|
||||
_, span := h.tr.Start(ctx, "redis.dial", h.opts...)
|
||||
defer span.Finish()
|
||||
*/
|
||||
conn, err := hook(ctx, network, addr)
|
||||
// recordError(span, err)
|
||||
|
||||
return conn, err
|
||||
}
|
||||
}
|
||||
|
||||
func (h *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
|
||||
return func(ctx context.Context, cmd redis.Cmder) error {
|
||||
cmdString := rediscmd.CmdString(cmd)
|
||||
var err error
|
||||
|
||||
switch cmdString {
|
||||
case "cluster slots":
|
||||
break
|
||||
default:
|
||||
_, span := h.tr.Start(ctx, "redis.process", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...)
|
||||
defer func() {
|
||||
recordError(span, err)
|
||||
span.Finish()
|
||||
}()
|
||||
}
|
||||
|
||||
err = hook(ctx, cmd)
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (h *tracingHook) ProcessPipelineHook(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
||||
return func(ctx context.Context, cmds []redis.Cmder) error {
|
||||
_, cmdsString := rediscmd.CmdsString(cmds)
|
||||
|
||||
opts := append(h.opts, tracer.WithSpanLabels(
|
||||
"db.redis.num_cmd", strconv.Itoa(len(cmds)),
|
||||
"db.statement", cmdsString,
|
||||
))
|
||||
|
||||
_, span := h.tr.Start(ctx, "redis.process_pipeline", opts...)
|
||||
defer span.Finish()
|
||||
|
||||
err := hook(ctx, cmds)
|
||||
recordError(span, err)
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func setSpanError(ctx context.Context, err error) {
|
||||
if err == nil || err == redis.Nil {
|
||||
return
|
||||
}
|
||||
if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func recordError(span tracer.Span, err error) {
|
||||
if err != nil && err != redis.Nil {
|
||||
span.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func formatDBConnString(network, addr string) string {
|
||||
if network == "tcp" {
|
||||
network = "redis"
|
||||
}
|
||||
return fmt.Sprintf("%s://%s", network, addr)
|
||||
}
|
Loading…
Reference in New Issue
Block a user