Compare commits

...

15 Commits
master ... v3

Author SHA1 Message Date
0ae3f3230d Merge pull request 'Add golangci.yml' (#2) from atolstikhin/micro-broker-redis:v3 into v3
All checks were successful
test / test (push) Successful in 12m30s
Reviewed-on: #2
2024-12-14 15:36:38 +03:00
Aleksandr Tolstikhin
3edd5af68a Add golangci.yml
Some checks failed
lint / lint (pull_request) Has been cancelled
test / test (pull_request) Has been cancelled
2024-12-14 01:20:33 +07:00
5f8e1ebc0f update go.mod
All checks were successful
test / test (push) Successful in 12m26s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-12 01:51:50 +03:00
bcc64d7b2f fixup for latest micro
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-11 18:57:54 +03:00
1adc91ef4b update deps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-10-24 17:35:53 +03:00
906b189f8b Merge pull request 'update deps' (#1) from devstigneev/micro-broker-redis:v3 into v3
Reviewed-on: #1
2024-10-09 17:22:44 +03:00
778f78f48d update deps 2024-10-09 17:09:45 +03:00
de0af10391 update deps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 18:08:47 +03:00
dbbdd81a57 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:30:46 +03:00
c7b6158602 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:17:59 +03:00
35b4ea057c fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 09:08:01 +03:00
46fbd9846a fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:56:52 +03:00
002a038413 fixup redis broker
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:45:22 +03:00
13a4527f83 update go deps
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-10 08:12:39 +03:00
2a7ce9411b fixup options
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-09 18:58:31 +03:00
15 changed files with 292 additions and 57 deletions

View File

@ -0,0 +1,18 @@
---
name: Bug report
about: For reporting bugs in micro
title: "[BUG]"
labels: ''
assignees: ''
---
**Describe the bug**
1. What are you trying to do?
2. What did you expect to happen?
3. What happens instead?
**How to reproduce the bug:**
If possible, please include a minimal code snippet here.

View File

@ -0,0 +1,17 @@
---
name: Feature request / Enhancement
about: If you have a need not served by micro
title: "[FEATURE]"
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Additional context**
Add any other context or screenshots about the feature request here.

View File

@ -0,0 +1,8 @@
---
name: Question
about: Ask a question about micro
title: ''
labels: ''
assignees: ''
---

View File

@ -0,0 +1,9 @@
## Pull Request template
Please, go through these steps before clicking submit on this PR.
1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

28
.gitea/autoapprove.yml Normal file
View File

@ -0,0 +1,28 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
workflow_run:
workflows: ["prbuild"]
types:
- completed
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
"chmod +x ./tea",
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
"./tea pr --repo ${{ github.event.repository.name }}"
]
if: github.actor == 'vtolstov'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -0,0 +1,29 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: https://github.com/golangci/golangci-lint-action@v6
with:
version: 'latest'

View File

@ -0,0 +1,34 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

View File

@ -0,0 +1,53 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches:
- master
- v3
- v4
push:
branches:
- master
- v3
- v4
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: checkout tests
uses: actions/checkout@v4
with:
ref: master
filter: 'blob:none'
repository: unistack-org/micro-tests
path: micro-tests
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup go work
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: /workspace/${{ github.repository_owner }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

25
.gitignore vendored Normal file
View File

@ -0,0 +1,25 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
cmd/test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

5
.golangci.yml Normal file
View File

@ -0,0 +1,5 @@
run:
concurrency: 8
deadline: 5m
issues-exit-code: 1
tests: true

12
go.mod
View File

@ -1,13 +1,17 @@
module go.unistack.org/micro-broker-redis/v3 module go.unistack.org/micro-broker-redis/v3
go 1.23.1 go 1.22
require ( require (
github.com/redis/go-redis/v9 v9.6.1 github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0
go.unistack.org/micro/v3 v3.10.84 github.com/redis/go-redis/v9 v9.7.0
go.unistack.org/micro/v3 v3.11.14
) )
require ( require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/go-cmp v0.6.0 // indirect
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
) )

20
go.sum
View File

@ -2,11 +2,19 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4= github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk=
go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0/go.mod h1:eTg/YQtGYAZD5r3DlGlJptJ45AHA+/G+2NPn30PKzik=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
go.unistack.org/micro/v3 v3.11.14 h1:3e9T30Ih9cvqZTCD8inG1qsBWRk4x5ZinWuTiDFM4CE=
go.unistack.org/micro/v3 v3.11.14/go.mod h1:k++F5Ej4LIy3XnOW/oj3P7B97wp2t9yLSlqtUzMpatM=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=

View File

@ -7,20 +7,13 @@ import (
"go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/store"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v3/tracer"
) )
type configKey struct{} type configKey struct{}
func Config(c *redis.Options) store.Option { func Config(c *redis.UniversalOptions) broker.Option {
return store.SetOption(configKey{}, c) return broker.SetOption(configKey{}, c)
}
type clusterConfigKey struct{}
func ClusterConfig(c *redis.ClusterOptions) store.Option {
return store.SetOption(clusterConfigKey{}, c)
} }
var ( var (
@ -52,19 +45,12 @@ func NewOptions(opts ...Option) Options {
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
MeterStatsInterval: DefaultMeterStatsInterval, MeterStatsInterval: DefaultMeterStatsInterval,
MeterMetricPrefix: DefaultMeterMetricPrefix,
} }
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
options.Meter = options.Meter.Clone(
meter.MetricPrefix(options.MeterMetricPrefix),
)
options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
return options return options
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@ -35,9 +36,9 @@ var (
// Event is an broker.Event // Event is an broker.Event
type Event struct { type Event struct {
ctx context.Context ctx context.Context
topic string
msg *broker.Message
err error err error
msg *broker.Message
topic string
} }
// Topic returns the topic this Event applies to. // Topic returns the topic this Event applies to.
@ -106,6 +107,8 @@ func (s *Subscriber) loop() {
} }
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg) err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
if err != nil { if err != nil {
p.msg.Body = codec.RawMessage(msg.Payload) p.msg.Body = codec.RawMessage(msg.Payload)
if eh != nil { if eh != nil {
@ -153,6 +156,19 @@ type Broker struct {
opts broker.Options opts broker.Options
cli redis.UniversalClient cli redis.UniversalClient
done chan struct{} done chan struct{}
connected *atomic.Uint32
}
func (b *Broker) Live() bool {
return b.connected.Load() == 1
}
func (b *Broker) Ready() bool {
return b.connected.Load() == 1
}
func (b *Broker) Health() bool {
return b.connected.Load() == 1
} }
// String returns the name of the broker implementation // String returns the name of the broker implementation
@ -223,6 +239,7 @@ func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broke
// Subscribe returns a broker.Subscriber for the topic and handler // Subscribe returns a broker.Subscriber for the topic and handler
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
s := &Subscriber{ s := &Subscriber{
ctx: ctx,
topic: topic, topic: topic,
handle: handler, handle: handler,
opts: b.opts, opts: b.opts,
@ -230,69 +247,64 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
done: make(chan struct{}), done: make(chan struct{}),
} }
// Run the receiver routine.
go s.loop()
s.sub = b.cli.Subscribe(s.ctx, s.topic) s.sub = b.cli.Subscribe(s.ctx, s.topic)
if err := s.sub.Ping(ctx, ""); err != nil { if err := s.sub.Ping(ctx, ""); err != nil {
return nil, err return nil, err
} }
go s.loop()
return s, nil return s, nil
} }
func (b *Broker) configure() error { func (b *Broker) configure(opts ...broker.Option) error {
if b.connected.Load() == 1 && len(opts) == 0 {
return nil
}
redisOptions := DefaultOptions redisOptions := DefaultOptions
if b.cli != nil && b.opts.Context == nil {
return nil
}
if b.opts.Context != nil { if b.opts.Context != nil {
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok { if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
redisOptions = c redisOptions = c
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig
}
} }
} }
if redisOptions == nil && b.cli != nil { if len(b.opts.Addrs) > 0 {
return nil
}
if redisOptions == nil {
redisOptions.Addrs = b.opts.Addrs redisOptions.Addrs = b.opts.Addrs
}
if b.opts.TLSConfig != nil {
redisOptions.TLSConfig = b.opts.TLSConfig redisOptions.TLSConfig = b.opts.TLSConfig
} }
c := redis.NewUniversalClient(redisOptions) c := redis.NewUniversalClient(redisOptions)
setTracing(c, b.opts.Tracer) setTracing(c, b.opts.Tracer)
b.cli = c
b.statsMeter() b.statsMeter()
return nil return nil
} }
func (b *Broker) Connect(ctx context.Context) error { func (b *Broker) Connect(ctx context.Context) error {
if b.connected.Load() == 1 {
return nil
}
var err error var err error
if b.cli != nil { if b.cli != nil {
err = b.cli.Ping(ctx).Err() err = b.cli.Ping(ctx).Err()
} }
setSpanError(ctx, err) setSpanError(ctx, err)
b.done = make(chan struct{})
return err return err
} }
func (b *Broker) Init(opts ...broker.Option) error { func (b *Broker) Init(opts ...broker.Option) error {
for _, o := range opts { err := b.configure(opts...)
o(&b.opts)
}
err := b.configure()
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
@ -304,6 +316,9 @@ func (b *Broker) Client() redis.UniversalClient {
} }
func (b *Broker) Disconnect(ctx context.Context) error { func (b *Broker) Disconnect(ctx context.Context) error {
if b.connected.Load() == 0 {
return nil
}
var err error var err error
select { select {
case <-b.done: case <-b.done:
@ -318,5 +333,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
} }
func NewBroker(opts ...broker.Option) *Broker { func NewBroker(opts ...broker.Option) *Broker {
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)} return &Broker{connected: &atomic.Uint32{}, opts: broker.NewOptions(opts...)}
} }

View File

@ -13,10 +13,6 @@ var (
PoolConnTotalCurrent = "pool_conn_total_current" PoolConnTotalCurrent = "pool_conn_total_current"
PoolConnIdleCurrent = "pool_conn_idle_current" PoolConnIdleCurrent = "pool_conn_idle_current"
PoolConnStaleTotal = "pool_conn_stale_total" PoolConnStaleTotal = "pool_conn_stale_total"
meterRequestTotal = "request_total"
meterRequestLatencyMicroseconds = "latency_microseconds"
meterRequestDurationSeconds = "request_duration_seconds"
) )
type Statser interface { type Statser interface {