From 74f42ab78db84f510e4179d8ceca858566df6664 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 28 Dec 2024 16:50:42 +0300 Subject: [PATCH] initial import Signed-off-by: Vasiliy Tolstov --- go.mod | 46 ++++++ go.sum | 151 +++++++++++++++++++ options.go | 30 ++++ redis.go | 393 ++++++++++++++++++++++++++++++++++++++++++++++++++ redis_test.go | 188 ++++++++++++++++++++++++ stats.go | 49 +++++++ tracer.go | 128 ++++++++++++++++ 7 files changed, 985 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 options.go create mode 100644 redis.go create mode 100644 redis_test.go create mode 100644 stats.go create mode 100644 tracer.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a7c7925 --- /dev/null +++ b/go.mod @@ -0,0 +1,46 @@ +module go.unistack.org/micro-register-redis/v3 + +go 1.23.4 + +require ( + github.com/alicebob/miniredis/v2 v2.34.0 + github.com/docker/docker v27.4.1+incompatible + github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 + github.com/redis/go-redis/v9 v9.7.0 + go.unistack.org/micro/v3 v3.11.35 +) + +require ( + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect + github.com/ash3in/uuidv8 v1.2.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/distribution v2.8.3+incompatible // indirect + github.com/docker/go-connections v0.5.0 // indirect + github.com/docker/go-metrics v0.0.1 // indirect + github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/mux v1.8.1 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/matoous/go-nanoid v1.5.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_golang v1.1.0 // indirect + github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 // indirect + github.com/prometheus/common v0.6.0 // indirect + github.com/prometheus/procfs v0.0.3 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect + go.unistack.org/micro-proto/v3 v3.4.1 // indirect + golang.org/x/sys v0.28.0 // indirect + google.golang.org/protobuf v1.36.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + gotest.tools/v3 v3.5.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8dc7c07 --- /dev/null +++ b/go.sum @@ -0,0 +1,151 @@ +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0= +github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8= +github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= +github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= +github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v27.4.1+incompatible h1:ZJvcY7gfwHn1JF48PfbyXg7Jyt9ZCWDW+GGXOIxEwp4= +github.com/docker/docker v27.4.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= +github.com/docker/go-metrics v0.0.1 h1:AgB/0SvBxihN0X8OR4SjsblXkbMvalQ8cjmtKQ2rQV8= +github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw= +github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7 h1:UhxFibDNY/bfvqU5CAUmr9zpesgbU6SWc8/B4mflAE4= +github.com/docker/libtrust v0.0.0-20160708172513-aabc10ec26b7/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= +github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.1.0 h1:BQ53HtBmfOitExawJ6LokA4x8ov/z0SYYb0+HxJfRI8= +github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= +github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURmKE= +github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk= +github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0/go.mod h1:eTg/YQtGYAZD5r3DlGlJptJ45AHA+/G+2NPn30PKzik= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= +go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= +go.unistack.org/micro/v3 v3.11.34 h1:kwAo09FLcWrAwDSWSnyegQGTJT5oz4cpb/LN7uw5imQ= +go.unistack.org/micro/v3 v3.11.34/go.mod h1:POGU5hstnAT9LH70m8FalyQSNi2GfIew71K75JenIZk= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/options.go b/options.go new file mode 100644 index 0000000..94bb8fe --- /dev/null +++ b/options.go @@ -0,0 +1,30 @@ +package redis + +import ( + "time" + + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/register" +) + +var ( + DefaultSeparator = "/" + DefaultOptions = &goredis.UniversalOptions{ + Username: "", + Password: "", // no password set + DB: 0, // use default DB + MaxRetries: 2, + MaxRetryBackoff: 256 * time.Millisecond, + DialTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + PoolTimeout: 1 * time.Second, + MinIdleConns: 10, + } +) + +type configKey struct{} + +func Config(c *goredis.UniversalOptions) register.Option { + return register.SetOption(configKey{}, c) +} diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..b01bb5a --- /dev/null +++ b/redis.go @@ -0,0 +1,393 @@ +package redis + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/docker/docker/registry" + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/codec" + "go.unistack.org/micro/v3/register" + "go.unistack.org/micro/v3/util/id" + pool "go.unistack.org/micro/v3/util/xpool" +) + +const ( + separator = "/" +) + +var ( + _ register.Register = (*Redis)(nil) + _ register.Watcher = (*Watcher)(nil) +) + +type Redis struct { + cli goredis.UniversalClient + opts register.Options + watchers map[string]*Watcher + services map[string][]*registry.Service + db int + pool *pool.StringsPool + sync.RWMutex +} + +// NewRegister returns an initialized in-memory register +func NewRegister(opts ...register.Option) *Redis { + r := &Redis{ + opts: register.NewOptions(opts...), + watchers: make(map[string]*Watcher), + services: make(map[string][]*registry.Service), + } + + return r +} + +func (r *Redis) Connect(ctx context.Context) error { + return nil +} + +func (r *Redis) Disconnect(ctx context.Context) error { + return nil +} + +func (r *Redis) Init(opts ...register.Option) error { + for _, o := range opts { + o(&r.opts) + } + + redisOptions := DefaultOptions + + if r.opts.Context != nil { + if c, ok := r.opts.Context.Value(configKey{}).(*goredis.UniversalOptions); ok && c != nil { + redisOptions = c + } + } + + if len(r.opts.Addrs) > 0 { + redisOptions.Addrs = r.opts.Addrs + } + + if r.opts.TLSConfig != nil { + redisOptions.TLSConfig = r.opts.TLSConfig + } + + c := goredis.NewUniversalClient(redisOptions) + setTracing(c, r.opts.Tracer) + r.pool = pool.NewStringsPool(50) + r.db = redisOptions.DB + r.cli = c + r.statsMeter() + + return nil +} + +func (r *Redis) Options() register.Options { + return r.opts +} + +func (r *Redis) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error { + options := register.NewRegisterOptions(opts...) + b := r.pool.Get() + defer func() { + b.Reset() + r.pool.Put(b) + }() + + for _, n := range s.Nodes { + buf, err := r.opts.Codec.Marshal(n) + if err != nil { + return err + } + if err = r.cli.Set(ctx, r.getKey(b, options.Namespace, s.Name, s.Version, n.ID), buf, options.TTL).Err(); err != nil { + return err + } + } + + evt := ®ister.Event{ + Timestamp: time.Now(), + Type: register.EventCreate, + Service: s, + ID: id.MustNew(), + } + + buf, err := r.opts.Codec.Marshal(evt) + if err != nil { + return err + } + + if err = r.cli.Publish(ctx, options.Namespace+separator+"events", buf).Err(); err != nil { + return err + } + + return nil +} + +func (r *Redis) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error { + options := register.NewDeregisterOptions(opts...) + b := r.pool.Get() + defer func() { + b.Reset() + r.pool.Put(b) + }() + var err error + for _, n := range s.Nodes { + if err = r.cli.Del(ctx, r.getKey(b, options.Namespace, s.Name, s.Version, n.ID)).Err(); err != nil && err != goredis.Nil { + return err + } + } + + evt := ®ister.Event{ + Timestamp: time.Now(), + Type: register.EventDelete, + Service: s, + ID: id.MustNew(), + } + + buf, err := r.opts.Codec.Marshal(evt) + if err != nil { + return err + } + + if err = r.cli.Publish(ctx, options.Namespace+separator+"events", buf).Err(); err != nil { + return err + } + + return nil +} + +func (r *Redis) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) { + options := register.NewLookupOptions(opts...) + b := r.pool.Get() + defer func() { + b.Reset() + r.pool.Put(b) + }() + + keys, err := r.cli.Keys(ctx, r.getKey(b, options.Namespace, name, "*", "")).Result() + if err != nil { + return nil, err + } + if len(keys) == 0 { + return nil, register.ErrNotFound + } + + vals, err := r.cli.MGet(ctx, keys...).Result() + if err != nil { + return nil, err + } + if len(vals) == 0 { + return nil, nil + } + + servicesMap := make(map[string]*register.Service) + for idx := range keys { + eidx := strings.LastIndex(keys[idx], separator) + sidx := strings.LastIndex(keys[idx][:eidx], separator) + if string(keys[idx][sidx]) == separator { + sidx++ + } + name := keys[idx][sidx:eidx] + svc, ok := servicesMap[name] + if !ok { + p := strings.Split(name, "-") + svc = ®ister.Service{Name: p[0], Version: p[1]} + } + + switch v := vals[idx].(type) { + case string: + node := ®ister.Node{} + if err = r.opts.Codec.Unmarshal([]byte(v), node); err != nil { + return nil, err + } + svc.Nodes = append(svc.Nodes, node) + case []byte: + node := ®ister.Node{} + if err = r.opts.Codec.Unmarshal(v, node); err != nil { + return nil, err + } + svc.Nodes = append(svc.Nodes, node) + } + + servicesMap[name] = svc + } + + svcs := make([]*register.Service, 0, len(servicesMap)) + for _, svc := range servicesMap { + svcs = append(svcs, svc) + } + + return svcs, nil +} + +func (r *Redis) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) { + options := register.NewListOptions(opts...) + b := r.pool.Get() + defer func() { + b.Reset() + r.pool.Put(b) + }() + + // TODO: replace Keys with Scan + keys, err := r.cli.Keys(ctx, r.getKey(b, options.Namespace, "*", "", "")).Result() + if err != nil { + return nil, err + } + if len(keys) == 0 { + return nil, nil + } + + vals, err := r.cli.MGet(ctx, keys...).Result() + if err != nil { + return nil, err + } + if len(vals) == 0 { + return nil, nil + } + + servicesMap := make(map[string]*register.Service) + for idx := range keys { + eidx := strings.LastIndex(keys[idx], separator) + sidx := strings.LastIndex(keys[idx][:eidx], separator) + if string(keys[idx][sidx]) == separator { + sidx++ + } + name := keys[idx][sidx:eidx] + svc, ok := servicesMap[name] + if !ok { + p := strings.Split(name, "-") + svc = ®ister.Service{Name: p[0], Version: p[1]} + } + + switch v := vals[idx].(type) { + case string: + node := ®ister.Node{} + if err = r.opts.Codec.Unmarshal([]byte(v), node); err != nil { + return nil, err + } + svc.Nodes = append(svc.Nodes, node) + case []byte: + node := ®ister.Node{} + if err = r.opts.Codec.Unmarshal(v, node); err != nil { + return nil, err + } + svc.Nodes = append(svc.Nodes, node) + } + + servicesMap[name] = svc + } + + svcs := make([]*register.Service, 0, len(servicesMap)) + for _, svc := range servicesMap { + svcs = append(svcs, svc) + } + + return svcs, nil +} + +func (r *Redis) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) { + id, err := id.New() + if err != nil { + return nil, err + } + wo := register.NewWatchOptions(opts...) + + w := &Watcher{ + exit: make(chan bool), + res: make(chan *register.Result), + id: id, + wo: wo, + sub: r.cli.Subscribe(ctx, wo.Namespace+separator+"events"), + codec: r.opts.Codec, + } + + r.Lock() + r.watchers[w.id] = w + r.Unlock() + + return w, nil +} + +func (r *Redis) Name() string { + return r.opts.Name +} + +func (r *Redis) String() string { + return "redis" +} + +type Watcher struct { + res chan *register.Result + exit chan bool + wo register.WatchOptions + id string + codec codec.Codec + sub *goredis.PubSub +} + +func (w *Watcher) Next() (*register.Result, error) { + var err error + for { + select { + case msg := <-w.sub.Channel(): + evt := ®ister.Event{} + if err = w.codec.Unmarshal([]byte(msg.Payload), evt); err != nil { + return nil, err + } + + if evt.Service == nil { + continue + } + + if len(w.wo.Service) > 0 && w.wo.Service != evt.Service.Name { + continue + } + + namespace := register.DefaultNamespace + if evt.Service.Namespace != "" { + namespace = evt.Service.Namespace + } + + // only send the event if watching the wildcard or this specific domain + if w.wo.Namespace == register.WildcardNamespace || w.wo.Namespace == namespace { + return ®ister.Result{Service: evt.Service, Action: evt.Type}, nil + } + + case <-w.exit: + return nil, register.ErrWatcherStopped + } + } +} + +func (w *Watcher) Stop() { + select { + case <-w.exit: + return + default: + w.sub.Close() + close(w.exit) + } +} + +func (r *Redis) getKey(b *strings.Builder, opNamespace string, name string, version string, nid string) string { + if opNamespace != "" { + b.WriteString(opNamespace) + b.WriteString(separator) + } + + if name != "" { + b.WriteString(name) + } + + if version != "" { + b.WriteString("-") + b.WriteString(version) + } + + if nid != "" { + b.WriteString(separator) + b.WriteString(nid) + } + return b.String() +} diff --git a/redis_test.go b/redis_test.go new file mode 100644 index 0000000..95ea37c --- /dev/null +++ b/redis_test.go @@ -0,0 +1,188 @@ +package redis + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "go.unistack.org/micro/v3/register" +) + +func TestRegister(t *testing.T) { + ctx := context.Background() + s := miniredis.RunT(t) + + r := NewRegister(register.Addrs(s.Addr())) + if err := r.Init(); err != nil { + t.Fatal(err) + } + if err := r.Connect(ctx); err != nil { + t.Fatal(err) + } + + defer func() { + if err := r.Disconnect(ctx); err != nil { + t.Fatal(err) + } + }() + + nodes := []*register.Node{ + {ID: "1", Address: "11.22.33.44"}, + } + svc := ®ister.Service{Name: "test", Version: "1.2.0", Nodes: nodes} + if err := r.Register(ctx, svc, register.RegisterTTL(100*time.Millisecond)); err != nil { + t.Fatal(err) + } + + svcs, err := r.ListServices(ctx) + if err != nil { + t.Fatal(err) + } else if len(svcs) == 0 { + t.Fatalf("no services registered") + } + if svcs[0].Name != "test" || svcs[0].Version != "1.2.0" { + t.Fatalf("invalid service %#+v", svcs[0]) + } + + svcs, err = r.LookupService(ctx, "test") + if err != nil { + t.Fatal(err) + } else if len(svcs) == 0 { + t.Fatalf("no services registered") + } + if svcs[0].Name != "test" || svcs[0].Version != "1.2.0" { + t.Fatalf("invalid service %d %#+v", len(svcs), svcs[0]) + } + + s.FastForward(200 * time.Millisecond) + + svcs, err = r.ListServices(ctx) + if err != nil { + t.Fatal(err) + } else if len(svcs) != 0 { + t.Fatalf("have registered services") + } +} + +func TestDeregister(t *testing.T) { + ctx := context.Background() + s := miniredis.RunT(t) + + r := NewRegister(register.Addrs(s.Addr())) + if err := r.Init(); err != nil { + t.Fatal(err) + } + if err := r.Connect(ctx); err != nil { + t.Fatal(err) + } + + defer func() { + if err := r.Disconnect(ctx); err != nil { + t.Fatal(err) + } + }() + + nodes := []*register.Node{ + {ID: "1", Address: "11.22.33.44"}, + } + svc := ®ister.Service{Name: "test", Version: "1.2.0", Nodes: nodes} + if err := r.Register(ctx, svc, register.RegisterTTL(500*time.Millisecond)); err != nil { + t.Fatal(err) + } + + svcs, err := r.LookupService(ctx, "test") + if err != nil { + t.Fatal(err) + } else if len(svcs) == 0 { + t.Fatalf("no services registered") + } + if svcs[0].Name != "test" || svcs[0].Version != "1.2.0" { + t.Fatalf("invalid service %#+v", svcs[0]) + } + + if err := r.Deregister(ctx, svc); err != nil { + t.Fatal(err) + } + + svcs, err = r.LookupService(ctx, "test") + if err == nil { + t.Fatalf("service not deregistered") + } else if len(svcs) != 0 { + t.Fatalf("service not deregistered") + } +} + +func TestWatch(t *testing.T) { + ctx := context.Background() + s := miniredis.RunT(t) + + r := NewRegister(register.Addrs(s.Addr())) + if err := r.Init(); err != nil { + t.Fatal(err) + } + if err := r.Connect(ctx); err != nil { + t.Fatal(err) + } + + defer func() { + if err := r.Disconnect(ctx); err != nil { + t.Fatal(err) + } + }() + + w, err := r.Watch(ctx) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + cherr := make(chan error) + go func() { + for { + res, err := w.Next() + if err != nil { + cherr <- err + } + if res.Action != register.EventCreate { + cherr <- fmt.Errorf("invalid event %#+v", res) + } + if res.Service.Name != "test" { + cherr <- fmt.Errorf("invalid event %#+v", res) + } + if res.Service.Version != "1.2.0" { + cherr <- fmt.Errorf("invalid event %#+v", res) + } + if len(res.Service.Nodes) != 1 { + cherr <- fmt.Errorf("invalid event %#+v", res) + } + if res.Service.Nodes[0].ID != "1" { + cherr <- fmt.Errorf("invalid event %#+v", res) + } + if res.Service.Nodes[0].Address != "11.22.33.44" { + cherr <- fmt.Errorf("invalid event %#+v", res) + } + cherr <- nil + break + } + }() + + nodes := []*register.Node{ + {ID: "1", Address: "11.22.33.44"}, + } + + go func() { + for { + svc := ®ister.Service{Name: "test", Version: "1.2.0", Nodes: nodes} + if err := r.Register(ctx, svc, register.RegisterTTL(500*time.Millisecond)); err != nil { + cherr <- err + } + time.Sleep(300 * time.Millisecond) + } + }() + + if err = <-cherr; err != nil { + t.Fatal(err) + } +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..b971a98 --- /dev/null +++ b/stats.go @@ -0,0 +1,49 @@ +package redis + +import ( + "time" + + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/meter" +) + +var ( + PoolHitsTotal = "pool_hits_total" + PoolMissesTotal = "pool_misses_total" + PoolTimeoutTotal = "pool_timeout_total" + PoolConnTotalCurrent = "pool_conn_total_current" + PoolConnIdleCurrent = "pool_conn_idle_current" + PoolConnStaleTotal = "pool_conn_stale_total" +) + +type Statser interface { + PoolStats() *goredis.PoolStats +} + +func (r *Redis) statsMeter() { + var st Statser + + if r.cli != nil { + st = r.cli + } else { + return + } + + go func() { + ticker := time.NewTicker(meter.DefaultMeterStatsInterval) + defer ticker.Stop() + + for range ticker.C { + if st == nil { + return + } + stats := st.PoolStats() + r.opts.Meter.Counter(PoolHitsTotal).Set(uint64(stats.Hits)) + r.opts.Meter.Counter(PoolMissesTotal).Set(uint64(stats.Misses)) + r.opts.Meter.Counter(PoolTimeoutTotal).Set(uint64(stats.Timeouts)) + r.opts.Meter.Counter(PoolConnTotalCurrent).Set(uint64(stats.TotalConns)) + r.opts.Meter.Counter(PoolConnIdleCurrent).Set(uint64(stats.IdleConns)) + r.opts.Meter.Counter(PoolConnStaleTotal).Set(uint64(stats.StaleConns)) + } + }() +} diff --git a/tracer.go b/tracer.go new file mode 100644 index 0000000..a038dab --- /dev/null +++ b/tracer.go @@ -0,0 +1,128 @@ +package redis + +import ( + "context" + "fmt" + "net" + "strconv" + + rediscmd "github.com/redis/go-redis/extra/rediscmd/v9" + goredis "github.com/redis/go-redis/v9" + "go.unistack.org/micro/v3/tracer" +) + +func setTracing(rdb goredis.UniversalClient, tr tracer.Tracer, opts ...tracer.SpanOption) { + switch rdb := rdb.(type) { + case *goredis.Client: + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + case *goredis.ClusterClient: + rdb.OnNewNode(func(rdb *goredis.Client) { + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + }) + case *goredis.Ring: + rdb.OnNewNode(func(rdb *goredis.Client) { + opt := rdb.Options() + connString := formatDBConnString(opt.Network, opt.Addr) + rdb.AddHook(newTracingHook(connString, tr)) + }) + } +} + +type tracingHook struct { + tr tracer.Tracer + opts []tracer.SpanOption +} + +var _ goredis.Hook = (*tracingHook)(nil) + +func newTracingHook(connString string, tr tracer.Tracer, opts ...tracer.SpanOption) *tracingHook { + opts = append(opts, tracer.WithSpanKind(tracer.SpanKindClient)) + if connString != "" { + opts = append(opts, tracer.WithSpanLabels("db.connection_string", connString)) + } + + return &tracingHook{ + tr: tr, + opts: opts, + } +} + +func (h *tracingHook) DialHook(hook goredis.DialHook) goredis.DialHook { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + /* + _, span := h.tr.Start(ctx, "goredis.dial", h.opts...) + defer span.Finish() + */ + conn, err := hook(ctx, network, addr) + // recordError(span, err) + + return conn, err + } +} + +func (h *tracingHook) ProcessHook(hook goredis.ProcessHook) goredis.ProcessHook { + return func(ctx context.Context, cmd goredis.Cmder) error { + cmdString := rediscmd.CmdString(cmd) + var err error + + switch cmdString { + case "cluster slots": + break + default: + _, span := h.tr.Start(ctx, "sdk.database", append(h.opts, tracer.WithSpanLabels("db.statement", cmdString))...) + defer func() { + recordError(span, err) + span.Finish() + }() + } + + err = hook(ctx, cmd) + + return err + } +} + +func (h *tracingHook) ProcessPipelineHook(hook goredis.ProcessPipelineHook) goredis.ProcessPipelineHook { + return func(ctx context.Context, cmds []goredis.Cmder) error { + _, cmdsString := rediscmd.CmdsString(cmds) + + opts := append(h.opts, tracer.WithSpanLabels( + "db.database.num_cmd", strconv.Itoa(len(cmds)), + "db.statement", cmdsString, + )) + + _, span := h.tr.Start(ctx, "sdk.database", opts...) + defer span.Finish() + + err := hook(ctx, cmds) + recordError(span, err) + + return err + } +} + +func setSpanError(ctx context.Context, err error) { + if err == nil || err == goredis.Nil { + return + } + if sp, ok := tracer.SpanFromContext(ctx); !ok && sp != nil { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + } +} + +func recordError(span tracer.Span, err error) { + if err != nil && err != goredis.Nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } +} + +func formatDBConnString(network, addr string) string { + if network == "tcp" { + network = "redis" + } + return fmt.Sprintf("%s://%s", network, addr) +}