Compare commits
25 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
c4d90c8a2b | ||
2b3c413adc | |||
20fb19fee9 | |||
8e3c56f4ed | |||
fcbae6f94a | |||
2f818d389b | |||
d55cb59531 | |||
8c416b10ef | |||
d3c2ae5f54 | |||
3bb8ec0753 | |||
27bca35eb6 | |||
498218912c | |||
bd04f5b9cb | |||
dc976006ad | |||
934ebf6c0a | |||
6d8fce53dd | |||
f12f3fb2c2 | |||
|
5a755437c9 | ||
05db1f3dae | |||
e4ba134fa6 | |||
8a85989b79 | |||
56185faabe | |||
823b2bdc52 | |||
48906c5612 | |||
23a1ff424e |
@@ -1,14 +0,0 @@
|
|||||||
---
|
|
||||||
name: Question
|
|
||||||
about: Ask a question about go-micro
|
|
||||||
title: ''
|
|
||||||
labels: ''
|
|
||||||
assignees: ''
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
Before asking, please check if your question has already been answered:
|
|
||||||
|
|
||||||
1. Check the documentation - https://micro.mu/docs/
|
|
||||||
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
|
|
||||||
3. Search existing issues
|
|
@@ -1,6 +1,6 @@
|
|||||||
---
|
---
|
||||||
name: Bug report
|
name: Bug report
|
||||||
about: For reporting bugs in go-micro
|
about: For reporting bugs in micro
|
||||||
title: "[BUG]"
|
title: "[BUG]"
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
||||||
@@ -16,9 +16,3 @@ assignees: ''
|
|||||||
**How to reproduce the bug:**
|
**How to reproduce the bug:**
|
||||||
|
|
||||||
If possible, please include a minimal code snippet here.
|
If possible, please include a minimal code snippet here.
|
||||||
|
|
||||||
**Environment:**
|
|
||||||
Go Version: please paste `go version` output here
|
|
||||||
```
|
|
||||||
please paste `go env` output here
|
|
||||||
```
|
|
@@ -1,6 +1,6 @@
|
|||||||
---
|
---
|
||||||
name: Feature request / Enhancement
|
name: Feature request / Enhancement
|
||||||
about: If you have a need not served by go-micro
|
about: If you have a need not served by micro
|
||||||
title: "[FEATURE]"
|
title: "[FEATURE]"
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
8
.github/ISSUE_TEMPLATE/question.md
vendored
Normal file
8
.github/ISSUE_TEMPLATE/question.md
vendored
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
---
|
||||||
|
name: Question
|
||||||
|
about: Ask a question about micro
|
||||||
|
title: ''
|
||||||
|
labels: ''
|
||||||
|
assignees: ''
|
||||||
|
|
||||||
|
---
|
28
.github/autoapprove.yml
vendored
Normal file
28
.github/autoapprove.yml
vendored
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
name: "autoapprove"
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request_target:
|
||||||
|
types: [assigned, opened, synchronize, reopened]
|
||||||
|
workflow_run:
|
||||||
|
workflows: ["prbuild"]
|
||||||
|
types:
|
||||||
|
- completed
|
||||||
|
|
||||||
|
permissions:
|
||||||
|
pull-requests: write
|
||||||
|
contents: write
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
autoapprove:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: approve
|
||||||
|
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
|
||||||
|
"chmod +x ./tea",
|
||||||
|
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
|
||||||
|
"./tea pr --repo ${{ github.event.repository.name }}"
|
||||||
|
]
|
||||||
|
if: github.actor == 'vtolstov'
|
||||||
|
id: approve
|
||||||
|
with:
|
||||||
|
github-token: ${{ secrets.GITHUB_TOKEN }}
|
53
.github/workflows/job_coverage.yml
vendored
Normal file
53
.github/workflows/job_coverage.yml
vendored
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
name: coverage
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [ main, v3, v4 ]
|
||||||
|
paths-ignore:
|
||||||
|
- '.github/**'
|
||||||
|
- '.gitea/**'
|
||||||
|
pull_request:
|
||||||
|
branches: [ main, v3, v4 ]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
|
||||||
|
build:
|
||||||
|
if: github.server_url != 'https://github.com'
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: checkout code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
with:
|
||||||
|
filter: 'blob:none'
|
||||||
|
|
||||||
|
- name: setup go
|
||||||
|
uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
cache-dependency-path: "**/*.sum"
|
||||||
|
go-version: 'stable'
|
||||||
|
|
||||||
|
- name: test coverage
|
||||||
|
run: |
|
||||||
|
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./...
|
||||||
|
go tool cover -func coverage.out -o coverage.out
|
||||||
|
|
||||||
|
- name: coverage badge
|
||||||
|
uses: tj-actions/coverage-badge-go@v2
|
||||||
|
with:
|
||||||
|
green: 80
|
||||||
|
filename: coverage.out
|
||||||
|
|
||||||
|
- uses: stefanzweifel/git-auto-commit-action@v4
|
||||||
|
name: autocommit
|
||||||
|
with:
|
||||||
|
commit_message: Apply Code Coverage Badge
|
||||||
|
skip_fetch: false
|
||||||
|
skip_checkout: false
|
||||||
|
file_pattern: ./README.md
|
||||||
|
|
||||||
|
- name: push
|
||||||
|
if: steps.auto-commit-action.outputs.changes_detected == 'true'
|
||||||
|
uses: ad-m/github-push-action@master
|
||||||
|
with:
|
||||||
|
github_token: ${{ github.token }}
|
||||||
|
branch: ${{ github.ref }}
|
@@ -3,10 +3,10 @@ name: lint
|
|||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, reopened, synchronize]
|
types: [opened, reopened, synchronize]
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
paths-ignore:
|
||||||
- v3
|
- '.github/**'
|
||||||
- v4
|
- '.gitea/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
lint:
|
lint:
|
||||||
@@ -24,6 +24,6 @@ jobs:
|
|||||||
- name: setup deps
|
- name: setup deps
|
||||||
run: go get -v ./...
|
run: go get -v ./...
|
||||||
- name: run lint
|
- name: run lint
|
||||||
uses: https://github.com/golangci/golangci-lint-action@v6
|
uses: golangci/golangci-lint-action@v6
|
||||||
with:
|
with:
|
||||||
version: 'latest'
|
version: 'latest'
|
94
.github/workflows/job_sync.yml
vendored
Normal file
94
.github/workflows/job_sync.yml
vendored
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
name: sync
|
||||||
|
|
||||||
|
on:
|
||||||
|
schedule:
|
||||||
|
- cron: '*/5 * * * *'
|
||||||
|
# Allows you to run this workflow manually from the Actions tab
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
sync:
|
||||||
|
if: github.server_url != 'https://github.com'
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: init
|
||||||
|
run: |
|
||||||
|
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
|
||||||
|
git config --global user.name "github-actions[bot]"
|
||||||
|
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
|
||||||
|
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
|
||||||
|
|
||||||
|
- name: check master
|
||||||
|
id: check_master
|
||||||
|
run: |
|
||||||
|
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
||||||
|
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
||||||
|
echo "src_hash=$src_hash"
|
||||||
|
echo "dst_hash=$dst_hash"
|
||||||
|
if [ "$src_hash" != "$dst_hash" ]; then
|
||||||
|
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
|
- name: sync master
|
||||||
|
if: steps.check_master.outputs.sync_needed == 'true'
|
||||||
|
run: |
|
||||||
|
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
|
cd repo
|
||||||
|
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
|
git pull --rebase upstream master
|
||||||
|
git push upstream master --progress
|
||||||
|
git push origin master --progress
|
||||||
|
cd ../
|
||||||
|
rm -rf repo
|
||||||
|
|
||||||
|
- name: check v3
|
||||||
|
id: check_v3
|
||||||
|
run: |
|
||||||
|
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
||||||
|
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
||||||
|
echo "src_hash=$src_hash"
|
||||||
|
echo "dst_hash=$dst_hash"
|
||||||
|
if [ "$src_hash" != "$dst_hash" ]; then
|
||||||
|
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
|
- name: sync v3
|
||||||
|
if: steps.check_v3.outputs.sync_needed == 'true'
|
||||||
|
run: |
|
||||||
|
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
|
cd repo
|
||||||
|
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
|
git pull --rebase upstream v3
|
||||||
|
git push upstream v3 --progress
|
||||||
|
git push origin v3 --progress
|
||||||
|
cd ../
|
||||||
|
rm -rf repo
|
||||||
|
|
||||||
|
- name: check v4
|
||||||
|
id: check_v4
|
||||||
|
run: |
|
||||||
|
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
||||||
|
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
||||||
|
echo "src_hash=$src_hash"
|
||||||
|
echo "dst_hash=$dst_hash"
|
||||||
|
if [ "$src_hash" != "$dst_hash" ]; then
|
||||||
|
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
||||||
|
else
|
||||||
|
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
||||||
|
fi
|
||||||
|
|
||||||
|
- name: sync v4
|
||||||
|
if: steps.check_v4.outputs.sync_needed == 'true'
|
||||||
|
run: |
|
||||||
|
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
||||||
|
cd repo
|
||||||
|
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
|
||||||
|
git pull --rebase upstream v4
|
||||||
|
git push upstream v4 --progress
|
||||||
|
git push origin v4 --progress
|
||||||
|
cd ../
|
||||||
|
rm -rf repo
|
@@ -3,15 +3,12 @@ name: test
|
|||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, reopened, synchronize]
|
types: [opened, reopened, synchronize]
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
|
||||||
- v3
|
|
||||||
- v4
|
|
||||||
push:
|
push:
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
paths-ignore:
|
||||||
- v3
|
- '.github/**'
|
||||||
- v4
|
- '.gitea/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
@@ -3,15 +3,12 @@ name: test
|
|||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, reopened, synchronize]
|
types: [opened, reopened, synchronize]
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
|
||||||
- v3
|
|
||||||
- v4
|
|
||||||
push:
|
push:
|
||||||
branches:
|
branches: [ master, v3, v4 ]
|
||||||
- master
|
paths-ignore:
|
||||||
- v3
|
- '.github/**'
|
||||||
- v4
|
- '.gitea/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
test:
|
||||||
@@ -35,19 +32,19 @@ jobs:
|
|||||||
go-version: 'stable'
|
go-version: 'stable'
|
||||||
- name: setup go work
|
- name: setup go work
|
||||||
env:
|
env:
|
||||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
GOWORK: ${{ github.workspace }}/go.work
|
||||||
run: |
|
run: |
|
||||||
go work init
|
go work init
|
||||||
go work use .
|
go work use .
|
||||||
go work use micro-tests
|
go work use micro-tests
|
||||||
- name: setup deps
|
- name: setup deps
|
||||||
env:
|
env:
|
||||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
GOWORK: ${{ github.workspace }}/go.work
|
||||||
run: go get -v ./...
|
run: go get -v ./...
|
||||||
- name: run tests
|
- name: run tests
|
||||||
env:
|
env:
|
||||||
INTEGRATION_TESTS: yes
|
INTEGRATION_TESTS: yes
|
||||||
GOWORK: /workspace/${{ github.repository_owner }}/go.work
|
GOWORK: ${{ github.workspace }}/go.work
|
||||||
run: |
|
run: |
|
||||||
cd micro-tests
|
cd micro-tests
|
||||||
go test -mod readonly -v ./... || true
|
go test -mod readonly -v ./... || true
|
@@ -1,5 +1,5 @@
|
|||||||
run:
|
run:
|
||||||
concurrency: 8
|
concurrency: 8
|
||||||
deadline: 5m
|
timeout: 5m
|
||||||
issues-exit-code: 1
|
issues-exit-code: 1
|
||||||
tests: true
|
tests: true
|
||||||
|
4
README.md
Normal file
4
README.md
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
# GRPC Server
|
||||||
|

|
||||||
|
|
||||||
|
This plugin is a grpc server for micro.
|
2
codec.go
2
codec.go
@@ -1,7 +1,7 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
2
error.go
2
error.go
@@ -6,7 +6,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v4/errors"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
|
//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
|
||||||
//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto"
|
//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto"
|
||||||
|
26
go.mod
26
go.mod
@@ -1,23 +1,27 @@
|
|||||||
module go.unistack.org/micro-server-grpc/v3
|
module go.unistack.org/micro-server-grpc/v4
|
||||||
|
|
||||||
go 1.22.0
|
go 1.23.0
|
||||||
|
|
||||||
|
toolchain go1.24.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
go.unistack.org/micro/v3 v3.11.30
|
github.com/stretchr/testify v1.10.0
|
||||||
golang.org/x/net v0.33.0
|
go.unistack.org/micro/v4 v4.1.8
|
||||||
google.golang.org/grpc v1.69.2
|
golang.org/x/net v0.39.0
|
||||||
google.golang.org/protobuf v1.36.1
|
google.golang.org/grpc v1.72.0
|
||||||
|
google.golang.org/protobuf v1.36.6
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/ash3in/uuidv8 v1.2.0 // indirect
|
github.com/ash3in/uuidv8 v1.2.0 // indirect
|
||||||
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/kr/pretty v0.3.1 // indirect
|
|
||||||
github.com/matoous/go-nanoid v1.5.1 // indirect
|
github.com/matoous/go-nanoid v1.5.1 // indirect
|
||||||
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||||
golang.org/x/sys v0.28.0 // indirect
|
github.com/spf13/cast v1.7.1 // indirect
|
||||||
golang.org/x/text v0.21.0 // indirect
|
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
|
golang.org/x/sys v0.32.0 // indirect
|
||||||
|
golang.org/x/text v0.24.0 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
67
go.sum
67
go.sum
@@ -2,7 +2,10 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl
|
|||||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||||
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
|
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
|
||||||
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
|
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
|
||||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
|
||||||
|
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||||
|
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||||
@@ -19,36 +22,42 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
|||||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
|
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
|
||||||
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
|
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
|
||||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||||
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
|
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
|
||||||
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
|
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
||||||
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY=
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk=
|
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||||
go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0=
|
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||||
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc=
|
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
|
||||||
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
|
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
|
||||||
go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
|
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
|
||||||
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
|
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
|
||||||
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
|
||||||
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
|
||||||
go.unistack.org/micro/v3 v3.11.30 h1:XTLgZubSGzQL85IUMp1pTJnS1lP4eFwTZyelV/SzOMc=
|
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
|
||||||
go.unistack.org/micro/v3 v3.11.30/go.mod h1:fvOkXKs3wKHToWH6Mxy+aovEiDl2q4UlOCdVfJdziBU=
|
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
|
||||||
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
|
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
|
||||||
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
|
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
||||||
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
|
||||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
|
||||||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
go.unistack.org/micro/v4 v4.1.8 h1:bbqnQ7nzTYNIYt4HhRgmh/++qzwGJvlNbrTOoXzkQiU=
|
||||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
go.unistack.org/micro/v4 v4.1.8/go.mod h1:b4dr7RFlbpSfSsKCva9UuX4zLtkYQteEK+Uuac39qJE=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A=
|
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
|
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
||||||
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
|
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
||||||
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
|
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
|
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
||||||
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
||||||
|
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
|
||||||
|
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
|
||||||
|
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||||
|
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
|
280
grpc.go
280
grpc.go
@@ -10,7 +10,6 @@ import (
|
|||||||
"slices"
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@@ -19,18 +18,16 @@ import (
|
|||||||
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
|
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
|
||||||
|
|
||||||
// nolint: staticcheck
|
// nolint: staticcheck
|
||||||
"go.unistack.org/micro/v3/broker"
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/errors"
|
||||||
"go.unistack.org/micro/v3/errors"
|
"go.unistack.org/micro/v4/logger"
|
||||||
"go.unistack.org/micro/v3/logger"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/meter"
|
||||||
"go.unistack.org/micro/v3/meter"
|
"go.unistack.org/micro/v4/options"
|
||||||
"go.unistack.org/micro/v3/options"
|
"go.unistack.org/micro/v4/register"
|
||||||
"go.unistack.org/micro/v3/register"
|
"go.unistack.org/micro/v4/semconv"
|
||||||
"go.unistack.org/micro/v3/semconv"
|
"go.unistack.org/micro/v4/server"
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/tracer"
|
||||||
msync "go.unistack.org/micro/v3/sync"
|
|
||||||
"go.unistack.org/micro/v3/tracer"
|
|
||||||
"golang.org/x/net/netutil"
|
"golang.org/x/net/netutil"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
@@ -46,13 +43,6 @@ const (
|
|||||||
DefaultContentType = "application/grpc"
|
DefaultContentType = "application/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
|
||||||
type ServerReflection struct {
|
|
||||||
srv *grpc.Server
|
|
||||||
s *serverReflectionServer
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
type streamWrapper struct {
|
type streamWrapper struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
grpc.ServerStream
|
grpc.ServerStream
|
||||||
@@ -69,18 +59,16 @@ type Server struct {
|
|||||||
handlers map[string]server.Handler
|
handlers map[string]server.Handler
|
||||||
srv *grpc.Server
|
srv *grpc.Server
|
||||||
exit chan chan error
|
exit chan chan error
|
||||||
wg *msync.WaitGroup
|
|
||||||
rsvc *register.Service
|
rsvc *register.Service
|
||||||
subscribers map[*subscriber][]broker.Subscriber
|
|
||||||
rpc *rServer
|
rpc *rServer
|
||||||
opts server.Options
|
opts server.Options
|
||||||
unknownHandler grpc.StreamHandler
|
unknownHandler grpc.StreamHandler
|
||||||
sync.RWMutex
|
mu sync.RWMutex
|
||||||
stateLive *atomic.Uint32
|
stateLive *atomic.Uint32
|
||||||
stateReady *atomic.Uint32
|
stateReady *atomic.Uint32
|
||||||
stateHealth *atomic.Uint32
|
stateHealth *atomic.Uint32
|
||||||
started bool
|
started bool
|
||||||
registered bool
|
registered bool
|
||||||
// reflection bool
|
// reflection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,7 +80,6 @@ func newServer(opts ...server.Option) *Server {
|
|||||||
serviceMap: make(map[string]*service),
|
serviceMap: make(map[string]*service),
|
||||||
},
|
},
|
||||||
handlers: make(map[string]server.Handler),
|
handlers: make(map[string]server.Handler),
|
||||||
subscribers: make(map[*subscriber][]broker.Subscriber),
|
|
||||||
exit: make(chan chan error),
|
exit: make(chan chan error),
|
||||||
stateLive: &atomic.Uint32{},
|
stateLive: &atomic.Uint32{},
|
||||||
stateReady: &atomic.Uint32{},
|
stateReady: &atomic.Uint32{},
|
||||||
@@ -104,25 +91,9 @@ func newServer(opts ...server.Option) *Server {
|
|||||||
return g
|
return g
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
type grpcRouter struct {
|
|
||||||
h func(context.Context, server.Request, interface{}) error
|
|
||||||
m func(context.Context, server.Message) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
|
|
||||||
return r.m(ctx, msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
|
|
||||||
return r.h(ctx, req, rsp)
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
func (g *Server) configure(opts ...server.Option) error {
|
func (g *Server) configure(opts ...server.Option) error {
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&g.opts)
|
o(&g.opts)
|
||||||
@@ -215,8 +186,9 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
|||||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var gmd map[string][]string
|
||||||
// get grpc metadata
|
// get grpc metadata
|
||||||
gmd, ok := gmetadata.FromIncomingContext(ctx)
|
gmd, ok = gmetadata.FromIncomingContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
gmd = gmetadata.MD{}
|
gmd = gmetadata.MD{}
|
||||||
}
|
}
|
||||||
@@ -249,25 +221,23 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
md := metadata.New(len(gmd))
|
md := metadata.Copy(gmd)
|
||||||
for k, v := range gmd {
|
|
||||||
md[k] = strings.Join(v, ", ")
|
md.Set("path", fullMethod)
|
||||||
}
|
md.Set("micro-server", "grpc")
|
||||||
md.Set("Path", fullMethod)
|
|
||||||
md.Set("Micro-Server", "grpc")
|
|
||||||
md.Set(metadata.HeaderEndpoint, methodName)
|
md.Set(metadata.HeaderEndpoint, methodName)
|
||||||
md.Set(metadata.HeaderService, serviceName)
|
md.Set(metadata.HeaderService, serviceName)
|
||||||
|
|
||||||
var td string
|
var td string
|
||||||
// timeout for server deadline
|
// timeout for server deadline
|
||||||
if v, ok := md.Get("timeout"); ok {
|
if v := md.Get("timeout"); v != nil {
|
||||||
md.Del("timeout")
|
md.Del("timeout")
|
||||||
td = v
|
td = v[0]
|
||||||
}
|
}
|
||||||
if v, ok := md.Get("Grpc-Timeout"); ok {
|
if v := md.Get("grpc-timeout"); v != nil {
|
||||||
md.Del("Grpc-Timeout")
|
md.Del("grpc-timeout")
|
||||||
td = v[:len(v)-1]
|
td = v[0][:len(v)-1]
|
||||||
switch v[len(v)-1:] {
|
switch v[0][len(v)-1:] {
|
||||||
case "S":
|
case "S":
|
||||||
td += "s"
|
td += "s"
|
||||||
case "M":
|
case "M":
|
||||||
@@ -286,16 +256,14 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
|||||||
// get content type
|
// get content type
|
||||||
ct := DefaultContentType
|
ct := DefaultContentType
|
||||||
|
|
||||||
if ctype, ok := md.Get("content-type"); ok {
|
if ctype := md.Get("content-type"); ctype != nil {
|
||||||
ct = ctype
|
ct = ctype[0]
|
||||||
} else if ctype, ok := md.Get("x-content-type"); ok {
|
|
||||||
ct = ctype
|
|
||||||
md.Del("x-content-type")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// create new context
|
// create new context
|
||||||
ctx = metadata.NewIncomingContext(ctx, md)
|
ctx = metadata.NewIncomingContext(ctx, md)
|
||||||
ctx = metadata.NewOutgoingContext(ctx, metadata.New(0))
|
ctx = metadata.NewOutgoingContext(ctx, metadata.New(0))
|
||||||
|
ctx = context.WithValue(ctx, rspMetadataKey{}, &rspMetadataVal{m: metadata.New(0)})
|
||||||
|
|
||||||
stream = &streamWrapper{ctx, stream}
|
stream = &streamWrapper{ctx, stream}
|
||||||
|
|
||||||
@@ -323,7 +291,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
|||||||
|
|
||||||
// get peer from context
|
// get peer from context
|
||||||
if p, ok := peer.FromContext(ctx); ok {
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
md.Set("Remote", p.Addr.String())
|
md.Set("remote", p.Addr.String())
|
||||||
ctx = peer.NewContext(ctx, p)
|
ctx = peer.NewContext(ctx, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -430,13 +398,22 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
|||||||
statusDesc := ""
|
statusDesc := ""
|
||||||
// execute the handler
|
// execute the handler
|
||||||
appErr := fn(ctx, r, replyv.Interface())
|
appErr := fn(ctx, r, replyv.Interface())
|
||||||
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md := getResponseMetadata(ctx); len(md) > 0 {
|
||||||
if err = stream.SendHeader(gmetadata.New(outmd)); err != nil {
|
if err = stream.SendHeader(md.AsHTTP2()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
|
var err error
|
||||||
var errStatus *status.Status
|
var errStatus *status.Status
|
||||||
|
var ok bool
|
||||||
|
errStatus, ok = status.FromError(appErr)
|
||||||
|
if ok {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
|
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
switch verr := appErr.(type) {
|
switch verr := appErr.(type) {
|
||||||
case *errors.Error:
|
case *errors.Error:
|
||||||
statusCode = microError(verr)
|
statusCode = microError(verr)
|
||||||
@@ -450,12 +427,10 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case (interface{ GRPCStatus() *status.Status }):
|
|
||||||
errStatus = verr.GRPCStatus()
|
|
||||||
default:
|
default:
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
config := g.opts
|
config := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
if config.Logger.V(logger.ErrorLevel) {
|
||||||
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
|
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
|
||||||
}
|
}
|
||||||
@@ -514,14 +489,22 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
|
|||||||
statusDesc := ""
|
statusDesc := ""
|
||||||
|
|
||||||
appErr := fn(ctx, r, ss)
|
appErr := fn(ctx, r, ss)
|
||||||
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md := getResponseMetadata(ctx); len(md) > 0 {
|
||||||
if err := stream.SendHeader(gmetadata.New(outmd)); err != nil {
|
if err := stream.SendHeader(md.AsHTTP2()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
var err error
|
var err error
|
||||||
var errStatus *status.Status
|
var errStatus *status.Status
|
||||||
|
var ok bool
|
||||||
|
errStatus, ok = status.FromError(appErr)
|
||||||
|
if ok {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
|
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
|
||||||
|
return errStatus.Err()
|
||||||
|
}
|
||||||
switch verr := appErr.(type) {
|
switch verr := appErr.(type) {
|
||||||
case *errors.Error:
|
case *errors.Error:
|
||||||
statusCode = microError(verr)
|
statusCode = microError(verr)
|
||||||
@@ -551,25 +534,10 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
|
|||||||
return status.New(statusCode, statusDesc).Err()
|
return status.New(statusCode, statusDesc).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) newCodec(ct string) (codec.Codec, error) {
|
|
||||||
g.RLock()
|
|
||||||
defer g.RUnlock()
|
|
||||||
|
|
||||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
|
||||||
ct = ct[:idx]
|
|
||||||
}
|
|
||||||
|
|
||||||
if c, ok := g.opts.Codecs[ct]; ok {
|
|
||||||
return c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, codec.ErrUnknownContentType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Server) Options() server.Options {
|
func (g *Server) Options() server.Options {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
opts := g.opts
|
opts := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
@@ -591,39 +559,11 @@ func (g *Server) Handle(h server.Handler) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
|
||||||
return newSubscriber(topic, sb, opts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Server) Subscribe(sb server.Subscriber) error {
|
|
||||||
sub, ok := sb.(*subscriber)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
|
||||||
}
|
|
||||||
if len(sub.handlers) == 0 {
|
|
||||||
return fmt.Errorf("invalid subscriber: no handler functions")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := server.ValidateSubscriber(sb); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
g.Lock()
|
|
||||||
if _, ok = g.subscribers[sub]; ok {
|
|
||||||
g.Unlock()
|
|
||||||
return fmt.Errorf("subscriber %v already exists", sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
g.subscribers[sub] = nil
|
|
||||||
g.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Server) Register() error {
|
func (g *Server) Register() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
rsvc := g.rsvc
|
rsvc := g.rsvc
|
||||||
config := g.opts
|
config := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
// if service already filled, reuse it and return early
|
// if service already filled, reuse it and return early
|
||||||
if rsvc != nil {
|
if rsvc != nil {
|
||||||
@@ -638,7 +578,7 @@ func (g *Server) Register() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
// Maps are ordered randomly, sort the keys for consistency
|
// Maps are ordered randomly, sort the keys for consistency
|
||||||
handlerList := make([]string, 0, len(g.handlers))
|
handlerList := make([]string, 0, len(g.handlers))
|
||||||
for n := range g.handlers {
|
for n := range g.handlers {
|
||||||
@@ -648,20 +588,11 @@ func (g *Server) Register() error {
|
|||||||
|
|
||||||
sort.Strings(handlerList)
|
sort.Strings(handlerList)
|
||||||
|
|
||||||
subscriberList := make([]*subscriber, 0, len(g.subscribers))
|
g.mu.RUnlock()
|
||||||
for e := range g.subscribers {
|
|
||||||
// Only advertise non internal subscribers
|
|
||||||
subscriberList = append(subscriberList, e)
|
|
||||||
}
|
|
||||||
sort.Slice(subscriberList, func(i, j int) bool {
|
|
||||||
return subscriberList[i].topic > subscriberList[j].topic
|
|
||||||
})
|
|
||||||
|
|
||||||
g.RUnlock()
|
g.mu.RLock()
|
||||||
|
|
||||||
g.RLock()
|
|
||||||
registered := g.registered
|
registered := g.registered
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
if !registered {
|
if !registered {
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
@@ -679,8 +610,8 @@ func (g *Server) Register() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
defer g.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
|
||||||
g.registered = true
|
g.registered = true
|
||||||
g.rsvc = service
|
g.rsvc = service
|
||||||
@@ -691,9 +622,9 @@ func (g *Server) Register() error {
|
|||||||
func (g *Server) Deregister() error {
|
func (g *Server) Deregister() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
config := g.opts
|
config := g.opts
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
service, err := server.NewRegisterService(g)
|
service, err := server.NewRegisterService(g)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -708,47 +639,27 @@ func (g *Server) Deregister() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.rsvc = nil
|
g.rsvc = nil
|
||||||
|
|
||||||
if !g.registered {
|
if !g.registered {
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
g.registered = false
|
g.registered = false
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
g.mu.Unlock()
|
||||||
for sb, subs := range g.subscribers {
|
|
||||||
for _, sub := range subs {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(s broker.Subscriber) {
|
|
||||||
defer wg.Done()
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
|
|
||||||
}
|
|
||||||
if err := s.Unsubscribe(g.opts.Context); err != nil {
|
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
|
||||||
config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(sub)
|
|
||||||
}
|
|
||||||
g.subscribers[sb] = nil
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
g.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) Start() error {
|
func (g *Server) Start() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
if g.started {
|
if g.started {
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
config := g.Options()
|
config := g.Options()
|
||||||
|
|
||||||
@@ -778,27 +689,12 @@ func (g *Server) Start() error {
|
|||||||
if config.Logger.V(logger.InfoLevel) {
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
|
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
|
||||||
}
|
}
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.opts.Address = ts.Addr().String()
|
g.opts.Address = ts.Addr().String()
|
||||||
if len(g.opts.Advertise) == 0 {
|
if len(g.opts.Advertise) == 0 {
|
||||||
g.opts.Advertise = ts.Addr().String()
|
g.opts.Advertise = ts.Addr().String()
|
||||||
}
|
}
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
|
|
||||||
// only connect if we're subscribed
|
|
||||||
if len(g.subscribers) > 0 {
|
|
||||||
// connect to the broker
|
|
||||||
if err = config.Broker.Connect(config.Context); err != nil {
|
|
||||||
if config.Logger.V(logger.ErrorLevel) {
|
|
||||||
config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// use RegisterCheck func before register
|
// use RegisterCheck func before register
|
||||||
// nolint: nestif
|
// nolint: nestif
|
||||||
@@ -815,10 +711,6 @@ func (g *Server) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = g.subscribe(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// micro: go ts.Accept(s.accept)
|
// micro: go ts.Accept(s.accept)
|
||||||
go func() {
|
go func() {
|
||||||
if err = g.srv.Serve(ts); err != nil {
|
if err = g.srv.Serve(ts); err != nil {
|
||||||
@@ -853,9 +745,9 @@ func (g *Server) Start() error {
|
|||||||
select {
|
select {
|
||||||
// register self on interval
|
// register self on interval
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
registered := g.registered
|
registered := g.registered
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
rerr := g.opts.RegisterCheck(g.opts.Context)
|
rerr := g.opts.RegisterCheck(g.opts.Context)
|
||||||
// nolint: nestif
|
// nolint: nestif
|
||||||
if rerr != nil && registered {
|
if rerr != nil && registered {
|
||||||
@@ -932,29 +824,29 @@ func (g *Server) Start() error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// mark the server as started
|
// mark the server as started
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.started = true
|
g.started = true
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Server) Stop() error {
|
func (g *Server) Stop() error {
|
||||||
g.RLock()
|
g.mu.RLock()
|
||||||
if !g.started {
|
if !g.started {
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
g.RUnlock()
|
g.mu.RUnlock()
|
||||||
|
|
||||||
ch := make(chan error)
|
ch := make(chan error)
|
||||||
g.exit <- ch
|
g.exit <- ch
|
||||||
|
|
||||||
err := <-ch
|
err := <-ch
|
||||||
g.Lock()
|
g.mu.Lock()
|
||||||
g.rsvc = nil
|
g.rsvc = nil
|
||||||
g.started = false
|
g.started = false
|
||||||
g.Unlock()
|
g.mu.Unlock()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@@ -3,7 +3,7 @@ package grpc
|
|||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
type rpcHandler struct {
|
type rpcHandler struct {
|
||||||
|
47
metadata.go
Normal file
47
metadata.go
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
rspMetadataKey struct{}
|
||||||
|
rspMetadataVal struct {
|
||||||
|
m metadata.Metadata
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// AppendResponseMetadata adds metadata entries to metadata.Metadata stored in the context.
|
||||||
|
// It expects the context to contain a *rspMetadataVal value under the rspMetadataKey{} key.
|
||||||
|
// If the value is missing or invalid, the function does nothing.
|
||||||
|
//
|
||||||
|
// Note: this function is not thread-safe. Synchronization is required if used from multiple goroutines.
|
||||||
|
func AppendResponseMetadata(ctx context.Context, md metadata.Metadata) {
|
||||||
|
if md == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val, ok := ctx.Value(rspMetadataKey{}).(*rspMetadataVal)
|
||||||
|
if !ok || val == nil || val.m == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, values := range md {
|
||||||
|
val.m.Append(key, values...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getResponseMetadata retrieves the metadata.Metadata stored in the context.
|
||||||
|
//
|
||||||
|
// Note: this function is not thread-safe. Synchronization is required if used from multiple goroutines.
|
||||||
|
// If you plan to modify the returned metadata, make a full copy to avoid affecting shared state.
|
||||||
|
func getResponseMetadata(ctx context.Context) metadata.Metadata {
|
||||||
|
val, ok := ctx.Value(rspMetadataKey{}).(*rspMetadataVal)
|
||||||
|
if !ok || val == nil || val.m == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return val.m
|
||||||
|
}
|
136
metadata_test.go
Normal file
136
metadata_test.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.unistack.org/micro/v4/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAppendResponseMetadata(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
ctx context.Context
|
||||||
|
md metadata.Metadata
|
||||||
|
expected context.Context
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "nil metadata",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
|
||||||
|
md: nil,
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty metadata",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
|
||||||
|
md: metadata.Metadata{},
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context without response metadata key",
|
||||||
|
ctx: context.Background(),
|
||||||
|
md: metadata.Pairs("key1", "val1"),
|
||||||
|
expected: context.Background(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context with nil response metadata value",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, nil),
|
||||||
|
md: metadata.Pairs("key1", "val1"),
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, nil),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context with incorrect type in response metadata value",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, struct{}{}),
|
||||||
|
md: metadata.Pairs("key1", "val1"),
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, struct{}{}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context with response metadata value, but nil metadata",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
|
||||||
|
md: metadata.Pairs("key1", "val1"),
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "basic metadata append",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
|
||||||
|
md: metadata.Pairs("key1", "val1"),
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
|
||||||
|
m: metadata.Metadata{
|
||||||
|
"key1": []string{"val1"},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple values for same key",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
|
||||||
|
md: metadata.Pairs("key1", "val1", "key1", "val2"),
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
|
||||||
|
m: metadata.Metadata{
|
||||||
|
"key1": []string{"val1", "val2"},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple values for different keys",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
|
||||||
|
md: metadata.Pairs("key1", "val1", "key1", "val2", "key2", "val3", "key2", "val4", "key3", "val5"),
|
||||||
|
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
|
||||||
|
m: metadata.Metadata{
|
||||||
|
"key1": []string{"val1", "val2"},
|
||||||
|
"key2": []string{"val3", "val4"},
|
||||||
|
"key3": []string{"val5"},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
AppendResponseMetadata(tt.ctx, tt.md)
|
||||||
|
require.Equal(t, tt.expected, tt.ctx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetResponseMetadata(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
ctx context.Context
|
||||||
|
expected metadata.Metadata
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "context without response metadata key",
|
||||||
|
ctx: context.Background(),
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context with nil response metadata value",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, nil),
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context with incorrect type in response metadata value",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &struct{}{}),
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "context with response metadata value, but nil metadata",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid metadata",
|
||||||
|
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
|
||||||
|
m: metadata.Pairs("key1", "value1"),
|
||||||
|
}),
|
||||||
|
expected: metadata.Metadata{"key1": {"value1"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
require.Equal(t, tt.expected, getResponseMetadata(tt.ctx))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@@ -3,7 +3,7 @@ package grpc
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
)
|
)
|
||||||
|
@@ -4,8 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
_ "go.unistack.org/micro-server-grpc/v3/proto"
|
"go.unistack.org/micro/v4/server"
|
||||||
"go.unistack.org/micro/v3/server"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/protobuf/reflect/protoreflect"
|
"google.golang.org/protobuf/reflect/protoreflect"
|
||||||
"google.golang.org/protobuf/reflect/protoregistry"
|
"google.golang.org/protobuf/reflect/protoregistry"
|
||||||
|
@@ -1,9 +1,9 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ server.Request = &rpcRequest{}
|
var _ server.Request = &rpcRequest{}
|
||||||
|
@@ -1,9 +1,9 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v3/codec"
|
"go.unistack.org/micro/v4/codec"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v4/metadata"
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ server.Response = &rpcResponse{}
|
var _ server.Response = &rpcResponse{}
|
||||||
|
42
server.go
42
server.go
@@ -14,7 +14,7 @@ import (
|
|||||||
"unicode"
|
"unicode"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Precompute the reflect type for error. Can't use error directly
|
// Precompute the reflect type for error. Can't use error directly
|
||||||
@@ -47,8 +47,8 @@ type rServer struct {
|
|||||||
|
|
||||||
// Is this an exported - upper case - name?
|
// Is this an exported - upper case - name?
|
||||||
func isExported(name string) bool {
|
func isExported(name string) bool {
|
||||||
rune, _ := utf8.DecodeRuneInString(name)
|
r, _ := utf8.DecodeRuneInString(name)
|
||||||
return unicode.IsUpper(rune)
|
return unicode.IsUpper(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Is this type exported or a builtin?
|
// Is this type exported or a builtin?
|
||||||
@@ -123,43 +123,43 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) {
|
|||||||
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
|
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (server *rServer) register(rcvr interface{}) error {
|
func (s *rServer) register(rcvr interface{}) error {
|
||||||
server.mu.Lock()
|
s.mu.Lock()
|
||||||
defer server.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
if server.serviceMap == nil {
|
if s.serviceMap == nil {
|
||||||
server.serviceMap = make(map[string]*service)
|
s.serviceMap = make(map[string]*service)
|
||||||
}
|
}
|
||||||
s := &service{}
|
srv := &service{}
|
||||||
s.typ = reflect.TypeOf(rcvr)
|
srv.typ = reflect.TypeOf(rcvr)
|
||||||
s.rcvr = reflect.ValueOf(rcvr)
|
srv.rcvr = reflect.ValueOf(rcvr)
|
||||||
sname := reflect.Indirect(s.rcvr).Type().Name()
|
sname := reflect.Indirect(srv.rcvr).Type().Name()
|
||||||
if sname == "" {
|
if sname == "" {
|
||||||
return fmt.Errorf("rpc: no service name for type %v", s.typ.String())
|
return fmt.Errorf("rpc: no service name for type %v", srv.typ.String())
|
||||||
}
|
}
|
||||||
if !isExported(sname) {
|
if !isExported(sname) {
|
||||||
return fmt.Errorf("rpc Register: type %s is not exported", sname)
|
return fmt.Errorf("rpc Register: type %s is not exported", sname)
|
||||||
}
|
}
|
||||||
if _, present := server.serviceMap[sname]; present {
|
if _, present := s.serviceMap[sname]; present {
|
||||||
return fmt.Errorf("rpc: service already defined: %s", sname)
|
return fmt.Errorf("rpc: service already defined: %s", sname)
|
||||||
}
|
}
|
||||||
s.name = sname
|
srv.name = sname
|
||||||
s.method = make(map[string]*methodType)
|
srv.method = make(map[string]*methodType)
|
||||||
|
|
||||||
// Install the methods
|
// Install the methods
|
||||||
for m := 0; m < s.typ.NumMethod(); m++ {
|
for m := 0; m < srv.typ.NumMethod(); m++ {
|
||||||
method := s.typ.Method(m)
|
method := srv.typ.Method(m)
|
||||||
mt, err := prepareEndpoint(method)
|
mt, err := prepareEndpoint(method)
|
||||||
if mt != nil && err == nil {
|
if mt != nil && err == nil {
|
||||||
s.method[method.Name] = mt
|
srv.method[method.Name] = mt
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(s.method) == 0 {
|
if len(srv.method) == 0 {
|
||||||
return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname)
|
return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname)
|
||||||
}
|
}
|
||||||
server.serviceMap[s.name] = s
|
s.serviceMap[srv.name] = srv
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -3,7 +3,7 @@ package grpc
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/server"
|
"go.unistack.org/micro/v4/server"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
258
subscriber.go
258
subscriber.go
@@ -1,258 +0,0 @@
|
|||||||
package grpc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/broker"
|
|
||||||
"go.unistack.org/micro/v3/codec"
|
|
||||||
"go.unistack.org/micro/v3/logger"
|
|
||||||
"go.unistack.org/micro/v3/metadata"
|
|
||||||
"go.unistack.org/micro/v3/options"
|
|
||||||
"go.unistack.org/micro/v3/server"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ server.Message = &rpcMessage{}
|
|
||||||
|
|
||||||
type rpcMessage struct {
|
|
||||||
payload interface{}
|
|
||||||
codec codec.Codec
|
|
||||||
header metadata.Metadata
|
|
||||||
topic string
|
|
||||||
contentType string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) ContentType() string {
|
|
||||||
return r.contentType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Topic() string {
|
|
||||||
return r.topic
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Body() interface{} {
|
|
||||||
return r.payload
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Header() metadata.Metadata {
|
|
||||||
return r.header
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rpcMessage) Codec() codec.Codec {
|
|
||||||
return r.codec
|
|
||||||
}
|
|
||||||
|
|
||||||
type handler struct {
|
|
||||||
reqType reflect.Type
|
|
||||||
ctxType reflect.Type
|
|
||||||
method reflect.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
type subscriber struct {
|
|
||||||
topic string
|
|
||||||
rcvr reflect.Value
|
|
||||||
typ reflect.Type
|
|
||||||
subscriber interface{}
|
|
||||||
handlers []*handler
|
|
||||||
opts server.SubscriberOptions
|
|
||||||
}
|
|
||||||
|
|
||||||
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
|
||||||
options := server.NewSubscriberOptions(opts...)
|
|
||||||
|
|
||||||
var handlers []*handler
|
|
||||||
|
|
||||||
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
|
|
||||||
h := &handler{
|
|
||||||
method: reflect.ValueOf(sub),
|
|
||||||
}
|
|
||||||
|
|
||||||
switch typ.NumIn() {
|
|
||||||
case 1:
|
|
||||||
h.reqType = typ.In(0)
|
|
||||||
case 2:
|
|
||||||
h.ctxType = typ.In(0)
|
|
||||||
h.reqType = typ.In(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
handlers = append(handlers, h)
|
|
||||||
|
|
||||||
} else {
|
|
||||||
for m := 0; m < typ.NumMethod(); m++ {
|
|
||||||
method := typ.Method(m)
|
|
||||||
h := &handler{
|
|
||||||
method: method.Func,
|
|
||||||
}
|
|
||||||
|
|
||||||
switch method.Type.NumIn() {
|
|
||||||
case 2:
|
|
||||||
h.reqType = method.Type.In(1)
|
|
||||||
case 3:
|
|
||||||
h.ctxType = method.Type.In(1)
|
|
||||||
h.reqType = method.Type.In(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
handlers = append(handlers, h)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &subscriber{
|
|
||||||
rcvr: reflect.ValueOf(sub),
|
|
||||||
typ: reflect.TypeOf(sub),
|
|
||||||
topic: topic,
|
|
||||||
subscriber: sub,
|
|
||||||
handlers: handlers,
|
|
||||||
opts: options,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
|
||||||
return func(p broker.Event) (err error) {
|
|
||||||
msg := p.Message()
|
|
||||||
// if we don't have headers, create empty map
|
|
||||||
if msg.Header == nil {
|
|
||||||
msg.Header = make(map[string]string)
|
|
||||||
}
|
|
||||||
|
|
||||||
ct := msg.Header["Content-Type"]
|
|
||||||
if len(ct) == 0 {
|
|
||||||
msg.Header["Content-Type"] = DefaultContentType
|
|
||||||
ct = DefaultContentType
|
|
||||||
}
|
|
||||||
cf, err := g.newCodec(ct)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
hdr := make(map[string]string, len(msg.Header))
|
|
||||||
for k, v := range msg.Header {
|
|
||||||
hdr[k] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
|
|
||||||
|
|
||||||
results := make(chan error, len(sb.handlers))
|
|
||||||
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
handler := sb.handlers[i]
|
|
||||||
|
|
||||||
var isVal bool
|
|
||||||
var req reflect.Value
|
|
||||||
|
|
||||||
if handler.reqType.Kind() == reflect.Ptr {
|
|
||||||
req = reflect.New(handler.reqType.Elem())
|
|
||||||
} else {
|
|
||||||
req = reflect.New(handler.reqType)
|
|
||||||
isVal = true
|
|
||||||
}
|
|
||||||
if isVal {
|
|
||||||
req = req.Elem()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fn := func(ctx context.Context, msg server.Message) error {
|
|
||||||
var vals []reflect.Value
|
|
||||||
if sb.typ.Kind() != reflect.Func {
|
|
||||||
vals = append(vals, sb.rcvr)
|
|
||||||
}
|
|
||||||
if handler.ctxType != nil {
|
|
||||||
vals = append(vals, reflect.ValueOf(ctx))
|
|
||||||
}
|
|
||||||
|
|
||||||
vals = append(vals, reflect.ValueOf(msg.Body()))
|
|
||||||
|
|
||||||
returnValues := handler.method.Call(vals)
|
|
||||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
|
||||||
return rerr.(error)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
opts.Hooks.EachPrev(func(hook options.Hook) {
|
|
||||||
if h, ok := hook.(server.HookSubHandler); ok {
|
|
||||||
fn = h(fn)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if g.wg != nil {
|
|
||||||
g.wg.Add(1)
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
if g.wg != nil {
|
|
||||||
defer g.wg.Done()
|
|
||||||
}
|
|
||||||
cerr := fn(ctx, &rpcMessage{
|
|
||||||
topic: sb.topic,
|
|
||||||
contentType: ct,
|
|
||||||
payload: req.Interface(),
|
|
||||||
header: msg.Header,
|
|
||||||
})
|
|
||||||
results <- cerr
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
var errors []string
|
|
||||||
for i := 0; i < len(sb.handlers); i++ {
|
|
||||||
if rerr := <-results; rerr != nil {
|
|
||||||
errors = append(errors, rerr.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(errors) > 0 {
|
|
||||||
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) Topic() string {
|
|
||||||
return s.topic
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) Subscriber() interface{} {
|
|
||||||
return s.subscriber
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *subscriber) Options() server.SubscriberOptions {
|
|
||||||
return s.opts
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Server) subscribe() error {
|
|
||||||
config := g.opts
|
|
||||||
subCtx := config.Context
|
|
||||||
|
|
||||||
for sb := range g.subscribers {
|
|
||||||
|
|
||||||
if cx := sb.Options().Context; cx != nil {
|
|
||||||
subCtx = cx
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := []broker.SubscribeOption{
|
|
||||||
broker.SubscribeContext(subCtx),
|
|
||||||
broker.SubscribeAutoAck(sb.Options().AutoAck),
|
|
||||||
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
|
|
||||||
}
|
|
||||||
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Info(config.Context, "subscribing to topic: "+sb.Topic())
|
|
||||||
}
|
|
||||||
|
|
||||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), g.createSubHandler(sb, config), opts...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
g.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
Reference in New Issue
Block a user