Compare commits
15 Commits
Author | SHA1 | Date | |
---|---|---|---|
0ae3f3230d | |||
|
3edd5af68a | ||
5f8e1ebc0f | |||
bcc64d7b2f | |||
1adc91ef4b | |||
906b189f8b | |||
778f78f48d | |||
de0af10391 | |||
dbbdd81a57 | |||
c7b6158602 | |||
35b4ea057c | |||
46fbd9846a | |||
002a038413 | |||
13a4527f83 | |||
2a7ce9411b |
18
.gitea/ISSUE_TEMPLATE/bug_report.md
Normal file
18
.gitea/ISSUE_TEMPLATE/bug_report.md
Normal file
@ -0,0 +1,18 @@
|
||||
---
|
||||
name: Bug report
|
||||
about: For reporting bugs in micro
|
||||
title: "[BUG]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Describe the bug**
|
||||
|
||||
1. What are you trying to do?
|
||||
2. What did you expect to happen?
|
||||
3. What happens instead?
|
||||
|
||||
**How to reproduce the bug:**
|
||||
|
||||
If possible, please include a minimal code snippet here.
|
17
.gitea/ISSUE_TEMPLATE/feature-request---enhancement.md
Normal file
17
.gitea/ISSUE_TEMPLATE/feature-request---enhancement.md
Normal file
@ -0,0 +1,17 @@
|
||||
---
|
||||
name: Feature request / Enhancement
|
||||
about: If you have a need not served by micro
|
||||
title: "[FEATURE]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Is your feature request related to a problem? Please describe.**
|
||||
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||
|
||||
**Describe the solution you'd like**
|
||||
A clear and concise description of what you want to happen.
|
||||
|
||||
**Additional context**
|
||||
Add any other context or screenshots about the feature request here.
|
8
.gitea/ISSUE_TEMPLATE/question.md
Normal file
8
.gitea/ISSUE_TEMPLATE/question.md
Normal file
@ -0,0 +1,8 @@
|
||||
---
|
||||
name: Question
|
||||
about: Ask a question about micro
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
9
.gitea/PULL_REQUEST_TEMPLATE.md
Normal file
9
.gitea/PULL_REQUEST_TEMPLATE.md
Normal file
@ -0,0 +1,9 @@
|
||||
## Pull Request template
|
||||
Please, go through these steps before clicking submit on this PR.
|
||||
|
||||
1. Give a descriptive title to your PR.
|
||||
2. Provide a description of your changes.
|
||||
3. Make sure you have some relevant tests.
|
||||
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
|
||||
|
||||
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
|
28
.gitea/autoapprove.yml
Normal file
28
.gitea/autoapprove.yml
Normal file
@ -0,0 +1,28 @@
|
||||
name: "autoapprove"
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [assigned, opened, synchronize, reopened]
|
||||
workflow_run:
|
||||
workflows: ["prbuild"]
|
||||
types:
|
||||
- completed
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
autoapprove:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: approve
|
||||
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
|
||||
"chmod +x ./tea",
|
||||
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
|
||||
"./tea pr --repo ${{ github.event.repository.name }}"
|
||||
]
|
||||
if: github.actor == 'vtolstov'
|
||||
id: approve
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
29
.gitea/workflows/job_lint.yml
Normal file
29
.gitea/workflows/job_lint.yml
Normal file
@ -0,0 +1,29 @@
|
||||
name: lint
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
filter: 'blob:none'
|
||||
- name: setup go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
cache-dependency-path: "**/*.sum"
|
||||
go-version: 'stable'
|
||||
- name: setup deps
|
||||
run: go get -v ./...
|
||||
- name: run lint
|
||||
uses: https://github.com/golangci/golangci-lint-action@v6
|
||||
with:
|
||||
version: 'latest'
|
34
.gitea/workflows/job_test.yml
Normal file
34
.gitea/workflows/job_test.yml
Normal file
@ -0,0 +1,34 @@
|
||||
name: test
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
filter: 'blob:none'
|
||||
- name: setup go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
cache-dependency-path: "**/*.sum"
|
||||
go-version: 'stable'
|
||||
- name: setup deps
|
||||
run: go get -v ./...
|
||||
- name: run test
|
||||
env:
|
||||
INTEGRATION_TESTS: yes
|
||||
run: go test -mod readonly -v ./...
|
53
.gitea/workflows/job_tests.yml
Normal file
53
.gitea/workflows/job_tests.yml
Normal file
@ -0,0 +1,53 @@
|
||||
name: test
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
- v4
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
filter: 'blob:none'
|
||||
- name: checkout tests
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: master
|
||||
filter: 'blob:none'
|
||||
repository: unistack-org/micro-tests
|
||||
path: micro-tests
|
||||
- name: setup go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
cache-dependency-path: "**/*.sum"
|
||||
go-version: 'stable'
|
||||
- name: setup go work
|
||||
env:
|
||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
||||
run: |
|
||||
go work init
|
||||
go work use .
|
||||
go work use micro-tests
|
||||
- name: setup deps
|
||||
env:
|
||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
||||
run: go get -v ./...
|
||||
- name: run tests
|
||||
env:
|
||||
INTEGRATION_TESTS: yes
|
||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
||||
run: |
|
||||
cd micro-tests
|
||||
go test -mod readonly -v ./... || true
|
25
.gitignore
vendored
Normal file
25
.gitignore
vendored
Normal file
@ -0,0 +1,25 @@
|
||||
# Binaries for programs and plugins
|
||||
*.exe
|
||||
*.exe~
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
bin
|
||||
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
cmd/test
|
||||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
|
||||
# Dependency directories (remove the comment below to include it)
|
||||
# vendor/
|
||||
|
||||
# Go workspace file
|
||||
go.work
|
||||
|
||||
# General
|
||||
.DS_Store
|
||||
.idea
|
||||
.vscode
|
5
.golangci.yml
Normal file
5
.golangci.yml
Normal file
@ -0,0 +1,5 @@
|
||||
run:
|
||||
concurrency: 8
|
||||
deadline: 5m
|
||||
issues-exit-code: 1
|
||||
tests: true
|
12
go.mod
12
go.mod
@ -1,13 +1,17 @@
|
||||
module go.unistack.org/micro-broker-redis/v3
|
||||
|
||||
go 1.23.1
|
||||
go 1.22
|
||||
|
||||
require (
|
||||
github.com/redis/go-redis/v9 v9.6.1
|
||||
go.unistack.org/micro/v3 v3.10.84
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0
|
||||
github.com/redis/go-redis/v9 v9.7.0
|
||||
go.unistack.org/micro/v3 v3.11.14
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
||||
google.golang.org/protobuf v1.35.2 // indirect
|
||||
)
|
||||
|
20
go.sum
20
go.sum
@ -2,11 +2,19 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
|
||||
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
|
||||
go.unistack.org/micro/v3 v3.10.84 h1:Fc38VoRnL+sFyVn8V/lx5T0sP/I4TKuQ61ium0fs6l4=
|
||||
go.unistack.org/micro/v3 v3.10.84/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0 h1:BIx9TNZH/Jsr4l1i7VVxnV0JPiwYj8qyrHyuL0fGZrk=
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.7.0/go.mod h1:eTg/YQtGYAZD5r3DlGlJptJ45AHA+/G+2NPn30PKzik=
|
||||
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
|
||||
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
|
||||
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
||||
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
||||
go.unistack.org/micro/v3 v3.11.14 h1:3e9T30Ih9cvqZTCD8inG1qsBWRk4x5ZinWuTiDFM4CE=
|
||||
go.unistack.org/micro/v3 v3.11.14/go.mod h1:k++F5Ej4LIy3XnOW/oj3P7B97wp2t9yLSlqtUzMpatM=
|
||||
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
|
||||
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
|
18
options.go
18
options.go
@ -7,20 +7,13 @@ import (
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/meter"
|
||||
"go.unistack.org/micro/v3/store"
|
||||
"go.unistack.org/micro/v3/tracer"
|
||||
)
|
||||
|
||||
type configKey struct{}
|
||||
|
||||
func Config(c *redis.Options) store.Option {
|
||||
return store.SetOption(configKey{}, c)
|
||||
}
|
||||
|
||||
type clusterConfigKey struct{}
|
||||
|
||||
func ClusterConfig(c *redis.ClusterOptions) store.Option {
|
||||
return store.SetOption(clusterConfigKey{}, c)
|
||||
func Config(c *redis.UniversalOptions) broker.Option {
|
||||
return broker.SetOption(configKey{}, c)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -52,19 +45,12 @@ func NewOptions(opts ...Option) Options {
|
||||
Meter: meter.DefaultMeter,
|
||||
Tracer: tracer.DefaultTracer,
|
||||
MeterStatsInterval: DefaultMeterStatsInterval,
|
||||
MeterMetricPrefix: DefaultMeterMetricPrefix,
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
options.Meter = options.Meter.Clone(
|
||||
meter.MetricPrefix(options.MeterMetricPrefix),
|
||||
)
|
||||
|
||||
options.Logger = options.Logger.Clone(logger.WithCallerSkipCount(1))
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
|
69
redis.go
69
redis.go
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
@ -35,9 +36,9 @@ var (
|
||||
// Event is an broker.Event
|
||||
type Event struct {
|
||||
ctx context.Context
|
||||
topic string
|
||||
msg *broker.Message
|
||||
err error
|
||||
msg *broker.Message
|
||||
topic string
|
||||
}
|
||||
|
||||
// Topic returns the topic this Event applies to.
|
||||
@ -106,6 +107,8 @@ func (s *Subscriber) loop() {
|
||||
}
|
||||
|
||||
err := s.opts.Codec.Unmarshal([]byte(msg.Payload), p.msg)
|
||||
p.ctx = metadata.NewIncomingContext(s.ctx, p.msg.Header)
|
||||
|
||||
if err != nil {
|
||||
p.msg.Body = codec.RawMessage(msg.Payload)
|
||||
if eh != nil {
|
||||
@ -150,9 +153,22 @@ func (s *Subscriber) Unsubscribe(ctx context.Context) error {
|
||||
|
||||
// Broker implements broker.Broker interface
|
||||
type Broker struct {
|
||||
opts broker.Options
|
||||
cli redis.UniversalClient
|
||||
done chan struct{}
|
||||
opts broker.Options
|
||||
cli redis.UniversalClient
|
||||
done chan struct{}
|
||||
connected *atomic.Uint32
|
||||
}
|
||||
|
||||
func (b *Broker) Live() bool {
|
||||
return b.connected.Load() == 1
|
||||
}
|
||||
|
||||
func (b *Broker) Ready() bool {
|
||||
return b.connected.Load() == 1
|
||||
}
|
||||
|
||||
func (b *Broker) Health() bool {
|
||||
return b.connected.Load() == 1
|
||||
}
|
||||
|
||||
// String returns the name of the broker implementation
|
||||
@ -223,6 +239,7 @@ func (b *Broker) BatchSubscribe(ctx context.Context, topic string, handler broke
|
||||
// Subscribe returns a broker.Subscriber for the topic and handler
|
||||
func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
||||
s := &Subscriber{
|
||||
ctx: ctx,
|
||||
topic: topic,
|
||||
handle: handler,
|
||||
opts: b.opts,
|
||||
@ -230,69 +247,64 @@ func (b *Broker) Subscribe(ctx context.Context, topic string, handler broker.Han
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Run the receiver routine.
|
||||
go s.loop()
|
||||
|
||||
s.sub = b.cli.Subscribe(s.ctx, s.topic)
|
||||
if err := s.sub.Ping(ctx, ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go s.loop()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (b *Broker) configure() error {
|
||||
redisOptions := DefaultOptions
|
||||
|
||||
if b.cli != nil && b.opts.Context == nil {
|
||||
func (b *Broker) configure(opts ...broker.Option) error {
|
||||
if b.connected.Load() == 1 && len(opts) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
redisOptions := DefaultOptions
|
||||
|
||||
if b.opts.Context != nil {
|
||||
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok {
|
||||
if c, ok := b.opts.Context.Value(configKey{}).(*redis.UniversalOptions); ok && c != nil {
|
||||
redisOptions = c
|
||||
if b.opts.TLSConfig != nil {
|
||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if redisOptions == nil && b.cli != nil {
|
||||
return nil
|
||||
if len(b.opts.Addrs) > 0 {
|
||||
redisOptions.Addrs = b.opts.Addrs
|
||||
}
|
||||
|
||||
if redisOptions == nil {
|
||||
redisOptions.Addrs = b.opts.Addrs
|
||||
if b.opts.TLSConfig != nil {
|
||||
redisOptions.TLSConfig = b.opts.TLSConfig
|
||||
}
|
||||
|
||||
c := redis.NewUniversalClient(redisOptions)
|
||||
setTracing(c, b.opts.Tracer)
|
||||
|
||||
b.cli = c
|
||||
b.statsMeter()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Broker) Connect(ctx context.Context) error {
|
||||
if b.connected.Load() == 1 {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
if b.cli != nil {
|
||||
err = b.cli.Ping(ctx).Err()
|
||||
}
|
||||
setSpanError(ctx, err)
|
||||
b.done = make(chan struct{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *Broker) Init(opts ...broker.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&b.opts)
|
||||
}
|
||||
|
||||
err := b.configure()
|
||||
err := b.configure(opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -304,6 +316,9 @@ func (b *Broker) Client() redis.UniversalClient {
|
||||
}
|
||||
|
||||
func (b *Broker) Disconnect(ctx context.Context) error {
|
||||
if b.connected.Load() == 0 {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
select {
|
||||
case <-b.done:
|
||||
@ -318,5 +333,5 @@ func (b *Broker) Disconnect(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func NewBroker(opts ...broker.Option) *Broker {
|
||||
return &Broker{done: make(chan struct{}), opts: broker.NewOptions(opts...)}
|
||||
return &Broker{connected: &atomic.Uint32{}, opts: broker.NewOptions(opts...)}
|
||||
}
|
||||
|
4
stats.go
4
stats.go
@ -13,10 +13,6 @@ var (
|
||||
PoolConnTotalCurrent = "pool_conn_total_current"
|
||||
PoolConnIdleCurrent = "pool_conn_idle_current"
|
||||
PoolConnStaleTotal = "pool_conn_stale_total"
|
||||
|
||||
meterRequestTotal = "request_total"
|
||||
meterRequestLatencyMicroseconds = "latency_microseconds"
|
||||
meterRequestDurationSeconds = "request_duration_seconds"
|
||||
)
|
||||
|
||||
type Statser interface {
|
||||
|
Loading…
x
Reference in New Issue
Block a user