Compare commits

...

35 Commits

Author SHA1 Message Date
2b3c413adc fixup grpc error codes in unary and stream processing
All checks were successful
sync / sync (push) Successful in 26s
coverage / build (push) Successful in 1m51s
test / test (push) Successful in 2m40s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-08 11:16:43 +03:00
20fb19fee9 changed embedded mutex to private field (#278)
Some checks failed
coverage / build (push) Successful in 2m26s
test / test (push) Failing after 16m18s
sync / sync (push) Successful in 8s
2025-05-14 01:25:57 +03:00
8e3c56f4ed update ci (#277)
All checks were successful
sync / sync (push) Successful in 10s
2025-05-05 19:19:33 +03:00
fcbae6f94a added commit hash check to avoid unnecessary repository cloning (#275)
All checks were successful
sync / sync (push) Successful in 10s
2025-05-05 13:44:43 +03:00
2f818d389b Merge branch 'v4' of https://git.unistack.org/unistack-org/micro-server-grpc into v4
All checks were successful
sync / sync (push) Successful in 14s
2025-05-04 15:02:46 +03:00
d55cb59531 fixup sync
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-05-04 15:02:02 +03:00
8c416b10ef Обновить .github/workflows/job_sync.yml
Some checks failed
sync / sync (push) Has been cancelled
2025-05-02 18:55:11 +03:00
d3c2ae5f54 convert headers to HTTP/2 format before SendHeader() (#273)
All checks were successful
coverage / build (push) Has been skipped
test / test (push) Successful in 2m19s
2025-05-02 18:51:30 +03:00
3bb8ec0753 Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Has been skipped
2025-05-01 22:07:43 +03:00
27bca35eb6 Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Has been skipped
2025-05-01 22:01:33 +03:00
498218912c Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Successful in 23s
2025-05-01 21:59:18 +03:00
bd04f5b9cb Обновить .github/workflows/job_sync.yml 2025-05-01 21:58:56 +03:00
dc976006ad Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Has been skipped
2025-05-01 21:05:02 +03:00
934ebf6c0a fix uninitialized response metadata for incoming context (#264)
All checks were successful
coverage / build (push) Has been skipped
test / test (push) Successful in 2m8s
sync / sync (push) Successful in 18s
2025-05-01 20:42:59 +03:00
6d8fce53dd update ci (#265) 2025-05-01 20:42:52 +03:00
f12f3fb2c2 Обновить .github/workflows/job_coverage.yml
All checks were successful
sync / sync (push) Successful in 15s
2025-05-01 20:34:33 +03:00
vtolstov
5a755437c9 Apply Code Coverage Badge 2025-04-29 15:43:08 +00:00
05db1f3dae [v4] fix ci pipeline (#260)
All checks were successful
coverage / build (push) Successful in 1m54s
test / test (push) Successful in 2m42s
* attempt to fix coverage job

* Apply Code Coverage Badge

---------

Co-authored-by: pugnack <pugnack@users.noreply.github.com>
2025-04-29 18:39:27 +03:00
e4ba134fa6 [v4] breaking change: modify API for working with response metadata (#255)
Some checks failed
coverage / build (push) Failing after 40s
test / test (push) Successful in 3m3s
sync / sync (push) Successful in 17s
* implement functions to append/get metadata

* сhanged behavior to return nil instead of empty metadata for getResponseMetadata()

* removed metadata copy when passing to gRPC headers

---------

Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 18:34:11 +03:00
8a85989b79 update all
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 18:30:08 +03:00
56185faabe fixup coverage job
All checks were successful
sync / sync (push) Successful in 26s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 14:16:34 +03:00
823b2bdc52 fixup for latest micro
Some checks failed
coverage / build (push) Failing after 1m21s
test / test (push) Successful in 4m8s
sync / sync (push) Failing after 8s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 14:08:59 +03:00
48906c5612 improve sync
Some checks failed
coverage / build (push) Failing after 5m36s
test / test (push) Successful in 11m32s
sync / sync (push) Failing after 41s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 21:38:23 +03:00
23a1ff424e prepare v4 (need swap target!) (#195)
All checks were successful
test / test (push) Successful in 1m53s
move to v4 micro

Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Reviewed-on: #195
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
2025-03-02 21:13:25 +03:00
134f7374aa Merge pull request 'update for latest micro' (#194) from register into v3
All checks were successful
test / test (push) Successful in 2m59s
Reviewed-on: #194
2024-12-27 01:39:00 +03:00
828f211a4e Merge branch 'v3' into register
Some checks failed
lint / lint (pull_request) Failing after 9m10s
test / test (pull_request) Successful in 11m57s
2024-12-27 01:25:45 +03:00
75c9d467e7 update for latest micro
Some checks failed
prbuild / test (pull_request) Failing after 2m37s
prbuild / lint (pull_request) Successful in 5m19s
codeql / analyze (go) (pull_request) Failing after 10m32s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 01:25:15 +03:00
699358383d Merge pull request 'update call Hooks' (#193) from devstigneev/micro-server-grpc:v3 into v3
All checks were successful
test / test (push) Successful in 2m27s
Reviewed-on: #193
2024-12-22 22:53:44 +03:00
82b95f8605 update call Hooks
All checks were successful
test / test (pull_request) Successful in 1m48s
lint / lint (pull_request) Successful in 1m23s
2024-12-20 21:37:01 +03:00
8ec0a4ef77 Update actions (#191)
All checks were successful
test / test (push) Successful in 1m42s
Update actions:
- build -> job_test
- pr -> job_lint

Co-authored-by: Aleksandr Tolstikhin <atolstikhin@mtsbank.ru>
Co-authored-by: Василий Толстов <v.tolstov@unistack.org>
Reviewed-on: #191
Co-authored-by: Александр Толстихин <tolstihin1996@mail.ru>
Co-committed-by: Александр Толстихин <tolstihin1996@mail.ru>
2024-12-11 01:37:14 +03:00
fb8e9ccb75 fixup lint and tests
Some checks failed
build / test (push) Failing after 28s
build / lint (push) Failing after 14m27s
codeql / analyze (go) (push) Failing after 14m22s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-11 01:34:24 +03:00
129f02b3bc fixup lint and tests
Some checks failed
build / test (push) Failing after 1m33s
build / lint (push) Successful in 2m15s
codeql / analyze (go) (push) Failing after 3m38s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-11 01:29:39 +03:00
55cdcfb3af rename workflow dir
Some checks failed
build / test (push) Failing after 4m53s
build / lint (push) Successful in 9m29s
codeql / analyze (go) (push) Failing after 14m56s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-07 00:52:51 +03:00
ac5e6dffa0 create outgoing metadata automatic on request
Some checks failed
build / test (push) Failing after 1s
build / lint (push) Failing after 1s
codeql / analyze (go) (push) Failing after 0s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-04 11:32:23 +03:00
66756f74dd update micro, add Health/Live/Ready checks
Some checks failed
codeql / analyze (go) (push) Failing after 55s
build / test (push) Failing after 4m55s
build / lint (push) Successful in 9m33s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-02 15:55:55 +03:00
35 changed files with 720 additions and 1398 deletions

View File

@@ -1,6 +1,6 @@
---
name: Bug report
about: For reporting bugs in go-micro
about: For reporting bugs in micro
title: "[BUG]"
labels: ''
assignees: ''
@@ -16,9 +16,3 @@ assignees: ''
**How to reproduce the bug:**
If possible, please include a minimal code snippet here.
**Environment:**
Go Version: please paste `go version` output here
```
please paste `go env` output here
```

View File

@@ -1,6 +1,6 @@
---
name: Feature request / Enhancement
about: If you have a need not served by go-micro
about: If you have a need not served by micro
title: "[FEATURE]"
labels: ''
assignees: ''

View File

@@ -1,14 +1,8 @@
---
name: Question
about: Ask a question about go-micro
about: Ask a question about micro
title: ''
labels: ''
assignees: ''
---
Before asking, please check if your question has already been answered:
1. Check the documentation - https://micro.mu/docs/
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
3. Search existing issues

28
.github/autoapprove.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
workflow_run:
workflows: ["prbuild"]
types:
- completed
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
"chmod +x ./tea",
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
"./tea pr --repo ${{ github.event.repository.name }}"
]
if: github.actor == 'vtolstov'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -1,19 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

View File

@@ -1,20 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -1,21 +0,0 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@@ -1,47 +0,0 @@
name: build
on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

View File

@@ -1,78 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@@ -1,27 +0,0 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

53
.github/workflows/job_coverage.yml vendored Normal file
View File

@@ -0,0 +1,53 @@
name: coverage
on:
push:
branches: [ main, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
pull_request:
branches: [ main, v3, v4 ]
jobs:
build:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: test coverage
run: |
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./...
go tool cover -func coverage.out -o coverage.out
- name: coverage badge
uses: tj-actions/coverage-badge-go@v2
with:
green: 80
filename: coverage.out
- uses: stefanzweifel/git-auto-commit-action@v4
name: autocommit
with:
commit_message: Apply Code Coverage Badge
skip_fetch: false
skip_checkout: false
file_pattern: ./README.md
- name: push
if: steps.auto-commit-action.outputs.changes_detected == 'true'
uses: ad-m/github-push-action@master
with:
github_token: ${{ github.token }}
branch: ${{ github.ref }}

29
.github/workflows/job_lint.yml vendored Normal file
View File

@@ -0,0 +1,29 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: golangci/golangci-lint-action@v6
with:
version: 'latest'

94
.github/workflows/job_sync.yml vendored Normal file
View File

@@ -0,0 +1,94 @@
name: sync
on:
schedule:
- cron: '*/5 * * * *'
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
sync:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: init
run: |
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
git config --global user.name "github-actions[bot]"
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
- name: check master
id: check_master
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync master
if: steps.check_master.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream master
git push upstream master --progress
git push origin master --progress
cd ../
rm -rf repo
- name: check v3
id: check_v3
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v3
if: steps.check_v3.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v3
git push upstream v3 --progress
git push origin v3 --progress
cd ../
rm -rf repo
- name: check v4
id: check_v4
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v4
if: steps.check_v4.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v4
git push upstream v4 --progress
git push origin v4 --progress
cd ../
rm -rf repo

31
.github/workflows/job_test.yml vendored Normal file
View File

@@ -0,0 +1,31 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

50
.github/workflows/job_tests.yml vendored Normal file
View File

@@ -0,0 +1,50 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: checkout tests
uses: actions/checkout@v4
with:
ref: master
filter: 'blob:none'
repository: unistack-org/micro-tests
path: micro-tests
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup go work
env:
GOWORK: ${{ github.workspace }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: ${{ github.workspace }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: ${{ github.workspace }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

View File

@@ -1,47 +0,0 @@
name: prbuild
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

View File

@@ -1,44 +1,5 @@
run:
concurrency: 4
deadline: 5m
concurrency: 8
timeout: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

4
README.md Normal file
View File

@@ -0,0 +1,4 @@
# GRPC Server
![Coverage](https://img.shields.io/badge/Coverage-3.5%25-red)
This plugin is a grpc server for micro.

View File

@@ -1,7 +1,7 @@
package grpc
import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v4/codec"
"google.golang.org/grpc/encoding"
)

View File

@@ -6,7 +6,7 @@ import (
"net/http"
"os"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v4/errors"
"google.golang.org/grpc/codes"
)

View File

@@ -1,4 +1,4 @@
package grpc
//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto"
//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto"

31
go.mod
View File

@@ -1,20 +1,27 @@
module go.unistack.org/micro-server-grpc/v3
module go.unistack.org/micro-server-grpc/v4
go 1.22
go 1.23.0
toolchain go1.23.1
toolchain go1.24.2
require (
github.com/golang/protobuf v1.5.4
go.unistack.org/micro/v3 v3.10.97
golang.org/x/net v0.30.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
github.com/stretchr/testify v1.10.0
go.unistack.org/micro/v4 v4.1.8
golang.org/x/net v0.39.0
google.golang.org/grpc v1.72.0
google.golang.org/protobuf v1.36.6
)
require (
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
github.com/ash3in/uuidv8 v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/matoous/go-nanoid v1.5.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/spf13/cast v1.7.1 // indirect
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

77
go.sum
View File

@@ -1,20 +1,65 @@
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
go.unistack.org/micro/v3 v3.10.97 h1:8l7fv+i06/PjPrBBhRC/ZQkWGIOuHPg3jJN0vktYE78=
go.unistack.org/micro/v3 v3.10.97/go.mod h1:YzMldzHN9Ei+zy5t/Psu7RUWDZwUfrNYiStSQtTz90g=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
go.unistack.org/micro/v4 v4.1.8 h1:bbqnQ7nzTYNIYt4HhRgmh/++qzwGJvlNbrTOoXzkQiU=
go.unistack.org/micro/v4 v4.1.8/go.mod h1:b4dr7RFlbpSfSsKCva9UuX4zLtkYQteEK+Uuac39qJE=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

326
grpc.go
View File

@@ -10,27 +10,24 @@ import (
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
greflection "google.golang.org/grpc/reflection"
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
// nolint: staticcheck
oldproto "github.com/golang/protobuf/proto"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
msync "go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v4/semconv"
"go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v4/tracer"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -39,19 +36,13 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)
const (
DefaultContentType = "application/grpc"
)
/*
type ServerReflection struct {
srv *grpc.Server
s *serverReflectionServer
}
*/
type streamWrapper struct {
ctx context.Context
grpc.ServerStream
@@ -68,16 +59,17 @@ type Server struct {
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *msync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
unknownHandler grpc.StreamHandler
sync.RWMutex
started bool
registered bool
reflection bool
mu sync.RWMutex
stateLive *atomic.Uint32
stateReady *atomic.Uint32
stateHealth *atomic.Uint32
started bool
registered bool
// reflection bool
}
func newServer(opts ...server.Option) *Server {
@@ -88,8 +80,10 @@ func newServer(opts ...server.Option) *Server {
serviceMap: make(map[string]*service),
},
handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{},
stateHealth: &atomic.Uint32{},
}
g.opts.Meter = g.opts.Meter.Clone(meter.Labels("type", "grpc"))
@@ -97,25 +91,9 @@ func newServer(opts ...server.Option) *Server {
return g
}
/*
type grpcRouter struct {
h func(context.Context, server.Request, interface{}) error
m func(context.Context, server.Message) error
}
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
return r.m(ctx, msg)
}
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
return r.h(ctx, req, rsp)
}
*/
func (g *Server) configure(opts ...server.Option) error {
g.Lock()
defer g.Unlock()
g.mu.Lock()
defer g.mu.Unlock()
for _, o := range opts {
o(&g.opts)
@@ -208,8 +186,9 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
return status.Errorf(codes.Internal, "method does not exist in context")
}
var gmd map[string][]string
// get grpc metadata
gmd, ok := gmetadata.FromIncomingContext(ctx)
gmd, ok = gmetadata.FromIncomingContext(ctx)
if !ok {
gmd = gmetadata.MD{}
}
@@ -242,25 +221,23 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
}()
}
md := metadata.New(len(gmd))
for k, v := range gmd {
md[k] = strings.Join(v, ", ")
}
md.Set("Path", fullMethod)
md.Set("Micro-Server", "grpc")
md := metadata.Copy(gmd)
md.Set("path", fullMethod)
md.Set("micro-server", "grpc")
md.Set(metadata.HeaderEndpoint, methodName)
md.Set(metadata.HeaderService, serviceName)
var td string
// timeout for server deadline
if v, ok := md.Get("timeout"); ok {
if v := md.Get("timeout"); v != nil {
md.Del("timeout")
td = v
td = v[0]
}
if v, ok := md.Get("Grpc-Timeout"); ok {
md.Del("Grpc-Timeout")
td = v[:len(v)-1]
switch v[len(v)-1:] {
if v := md.Get("grpc-timeout"); v != nil {
md.Del("grpc-timeout")
td = v[0][:len(v)-1]
switch v[0][len(v)-1:] {
case "S":
td += "s"
case "M":
@@ -279,15 +256,14 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get content type
ct := DefaultContentType
if ctype, ok := md.Get("content-type"); ok {
ct = ctype
} else if ctype, ok := md.Get("x-content-type"); ok {
ct = ctype
md.Del("x-content-type")
if ctype := md.Get("content-type"); ctype != nil {
ct = ctype[0]
}
// create new context
ctx = metadata.NewIncomingContext(ctx, md)
ctx = metadata.NewOutgoingContext(ctx, metadata.New(0))
ctx = context.WithValue(ctx, rspMetadataKey{}, &rspMetadataVal{m: metadata.New(0)})
stream = &streamWrapper{ctx, stream}
@@ -315,7 +291,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get peer from context
if p, ok := peer.FromContext(ctx); ok {
md.Set("Remote", p.Addr.String())
md.Set("remote", p.Addr.String())
ctx = peer.NewContext(ctx, p)
}
@@ -412,7 +388,7 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
return err
}
g.opts.Hooks.EachNext(func(hook options.Hook) {
g.opts.Hooks.EachPrev(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
}
@@ -422,13 +398,22 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
statusDesc := ""
// execute the handler
appErr := fn(ctx, r, replyv.Interface())
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err = stream.SendHeader(gmetadata.New(outmd)); err != nil {
if md := getResponseMetadata(ctx); len(md) > 0 {
if err = stream.SendHeader(md.AsHTTP2()); err != nil {
return err
}
}
if appErr != nil {
var err error
var errStatus *status.Status
var ok bool
errStatus, ok = status.FromError(appErr)
if ok {
return errStatus.Err()
}
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
return errStatus.Err()
}
switch verr := appErr.(type) {
case *errors.Error:
statusCode = microError(verr)
@@ -438,16 +423,14 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
// user defined error that proto based we can attach it to grpc status
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr))
errStatus, err = status.New(statusCode, statusDesc).WithDetails(protoadapt.MessageV1Of(verr))
if err != nil {
return err
}
case (interface{ GRPCStatus() *status.Status }):
errStatus = verr.GRPCStatus()
default:
g.RLock()
g.mu.RLock()
config := g.opts
g.RUnlock()
g.mu.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
}
@@ -496,7 +479,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
return nil
}
opts.Hooks.EachNext(func(hook options.Hook) {
opts.Hooks.EachPrev(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
}
@@ -506,14 +489,22 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
statusDesc := ""
appErr := fn(ctx, r, ss)
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err := stream.SendHeader(gmetadata.New(outmd)); err != nil {
if md := getResponseMetadata(ctx); len(md) > 0 {
if err := stream.SendHeader(md.AsHTTP2()); err != nil {
return err
}
}
if appErr != nil {
var err error
var errStatus *status.Status
var ok bool
errStatus, ok = status.FromError(appErr)
if ok {
return errStatus.Err()
}
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
return errStatus.Err()
}
switch verr := appErr.(type) {
case *errors.Error:
statusCode = microError(verr)
@@ -523,7 +514,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
// user defined error that proto based we can attach it to grpc status
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr))
errStatus, err = status.New(statusCode, statusDesc).WithDetails(protoadapt.MessageV1Of(verr))
if err != nil {
return err
}
@@ -543,25 +534,10 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
return status.New(statusCode, statusDesc).Err()
}
func (g *Server) newCodec(ct string) (codec.Codec, error) {
g.RLock()
defer g.RUnlock()
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
if c, ok := g.opts.Codecs[ct]; ok {
return c, nil
}
return nil, codec.ErrUnknownContentType
}
func (g *Server) Options() server.Options {
g.RLock()
g.mu.RLock()
opts := g.opts
g.RUnlock()
g.mu.RUnlock()
return opts
}
@@ -583,39 +559,11 @@ func (g *Server) Handle(h server.Handler) error {
return nil
}
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (g *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
return err
}
g.Lock()
if _, ok = g.subscribers[sub]; ok {
g.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
g.subscribers[sub] = nil
g.Unlock()
return nil
}
func (g *Server) Register() error {
g.RLock()
g.mu.RLock()
rsvc := g.rsvc
config := g.opts
g.RUnlock()
g.mu.RUnlock()
// if service already filled, reuse it and return early
if rsvc != nil {
@@ -630,7 +578,7 @@ func (g *Server) Register() error {
return err
}
g.RLock()
g.mu.RLock()
// Maps are ordered randomly, sort the keys for consistency
handlerList := make([]string, 0, len(g.handlers))
for n := range g.handlers {
@@ -640,31 +588,11 @@ func (g *Server) Register() error {
sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(g.subscribers))
for e := range g.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
g.mu.RUnlock()
endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, n := range handlerList {
endpoints = append(endpoints, g.handlers[n].Endpoints()...)
}
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
g.RUnlock()
service.Nodes[0].Metadata["protocol"] = "grpc"
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
service.Endpoints = endpoints
g.RLock()
g.mu.RLock()
registered := g.registered
g.RUnlock()
g.mu.RUnlock()
if !registered {
if config.Logger.V(logger.InfoLevel) {
@@ -682,8 +610,8 @@ func (g *Server) Register() error {
return nil
}
g.Lock()
defer g.Unlock()
g.mu.Lock()
defer g.mu.Unlock()
g.registered = true
g.rsvc = service
@@ -694,9 +622,9 @@ func (g *Server) Register() error {
func (g *Server) Deregister() error {
var err error
g.RLock()
g.mu.RLock()
config := g.opts
g.RUnlock()
g.mu.RUnlock()
service, err := server.NewRegisterService(g)
if err != nil {
@@ -711,47 +639,27 @@ func (g *Server) Deregister() error {
return err
}
g.Lock()
g.mu.Lock()
g.rsvc = nil
if !g.registered {
g.Unlock()
g.mu.Unlock()
return nil
}
g.registered = false
wg := sync.WaitGroup{}
for sb, subs := range g.subscribers {
for _, sub := range subs {
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(g.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err)
}
}
}(sub)
}
g.subscribers[sb] = nil
}
wg.Wait()
g.Unlock()
g.mu.Unlock()
return nil
}
func (g *Server) Start() error {
g.RLock()
g.mu.RLock()
if g.started {
g.RUnlock()
g.mu.RUnlock()
return nil
}
g.RUnlock()
g.mu.RUnlock()
config := g.Options()
@@ -781,27 +689,12 @@ func (g *Server) Start() error {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
}
g.Lock()
g.mu.Lock()
g.opts.Address = ts.Addr().String()
if len(g.opts.Advertise) == 0 {
g.opts.Advertise = ts.Addr().String()
}
g.Unlock()
// only connect if we're subscribed
if len(g.subscribers) > 0 {
// connect to the broker
if err = config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
g.mu.Unlock()
// use RegisterCheck func before register
// nolint: nestif
@@ -818,10 +711,6 @@ func (g *Server) Start() error {
}
}
if err = g.subscribe(); err != nil {
return err
}
// micro: go ts.Accept(s.accept)
go func() {
if err = g.srv.Serve(ts); err != nil {
@@ -834,6 +723,9 @@ func (g *Server) Start() error {
}
}
}
g.stateLive.Store(1)
g.stateReady.Store(1)
g.stateHealth.Store(1)
}()
go func() {
@@ -853,9 +745,9 @@ func (g *Server) Start() error {
select {
// register self on interval
case <-t.C:
g.RLock()
g.mu.RLock()
registered := g.registered
g.RUnlock()
g.mu.RUnlock()
rerr := g.opts.RegisterCheck(g.opts.Context)
// nolint: nestif
if rerr != nil && registered {
@@ -903,12 +795,18 @@ func (g *Server) Start() error {
go func() {
g.srv.GracefulStop()
close(exit)
g.stateLive.Store(0)
g.stateReady.Store(0)
g.stateHealth.Store(0)
}()
select {
case <-exit:
case <-time.After(g.opts.GracefulTimeout):
g.srv.Stop()
g.stateLive.Store(0)
g.stateReady.Store(0)
g.stateHealth.Store(0)
}
// close transport
@@ -926,29 +824,29 @@ func (g *Server) Start() error {
}()
// mark the server as started
g.Lock()
g.mu.Lock()
g.started = true
g.Unlock()
g.mu.Unlock()
return nil
}
func (g *Server) Stop() error {
g.RLock()
g.mu.RLock()
if !g.started {
g.RUnlock()
g.mu.RUnlock()
return nil
}
g.RUnlock()
g.mu.RUnlock()
ch := make(chan error)
g.exit <- ch
err := <-ch
g.Lock()
g.mu.Lock()
g.rsvc = nil
g.started = false
g.Unlock()
g.mu.Unlock()
return err
}
@@ -965,6 +863,18 @@ func (g *Server) GRPCServer() *grpc.Server {
return g.srv
}
func (g *Server) Live() bool {
return g.stateLive.Load() == 1
}
func (g *Server) Ready() bool {
return g.stateReady.Load() == 1
}
func (g *Server) Health() bool {
return g.stateHealth.Load() == 1
}
func NewServer(opts ...server.Option) *Server {
return newServer(opts...)
}

View File

@@ -3,43 +3,25 @@ package grpc
import (
"reflect"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/server"
)
type rpcHandler struct {
opts server.HandlerOptions
handler interface{}
name string
endpoints []*register.Endpoint
opts server.HandlerOptions
handler interface{}
name string
}
func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*register.Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
name: name,
handler: handler,
opts: options,
}
}
@@ -51,10 +33,6 @@ func (r *rpcHandler) Handler() interface{} {
return r.handler
}
func (r *rpcHandler) Endpoints() []*register.Endpoint {
return r.endpoints
}
func (r *rpcHandler) Options() server.HandlerOptions {
return r.opts
}

47
metadata.go Normal file
View File

@@ -0,0 +1,47 @@
package grpc
import (
"context"
"go.unistack.org/micro/v4/metadata"
)
type (
rspMetadataKey struct{}
rspMetadataVal struct {
m metadata.Metadata
}
)
// AppendResponseMetadata adds metadata entries to metadata.Metadata stored in the context.
// It expects the context to contain a *rspMetadataVal value under the rspMetadataKey{} key.
// If the value is missing or invalid, the function does nothing.
//
// Note: this function is not thread-safe. Synchronization is required if used from multiple goroutines.
func AppendResponseMetadata(ctx context.Context, md metadata.Metadata) {
if md == nil {
return
}
val, ok := ctx.Value(rspMetadataKey{}).(*rspMetadataVal)
if !ok || val == nil || val.m == nil {
return
}
for key, values := range md {
val.m.Append(key, values...)
}
}
// getResponseMetadata retrieves the metadata.Metadata stored in the context.
//
// Note: this function is not thread-safe. Synchronization is required if used from multiple goroutines.
// If you plan to modify the returned metadata, make a full copy to avoid affecting shared state.
func getResponseMetadata(ctx context.Context) metadata.Metadata {
val, ok := ctx.Value(rspMetadataKey{}).(*rspMetadataVal)
if !ok || val == nil || val.m == nil {
return nil
}
return val.m
}

136
metadata_test.go Normal file
View File

@@ -0,0 +1,136 @@
package grpc
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"go.unistack.org/micro/v4/metadata"
)
func TestAppendResponseMetadata(t *testing.T) {
tests := []struct {
name string
ctx context.Context
md metadata.Metadata
expected context.Context
}{
{
name: "nil metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: nil,
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
},
{
name: "empty metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Metadata{},
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
},
{
name: "context without response metadata key",
ctx: context.Background(),
md: metadata.Pairs("key1", "val1"),
expected: context.Background(),
},
{
name: "context with nil response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, nil),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, nil),
},
{
name: "context with incorrect type in response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, struct{}{}),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, struct{}{}),
},
{
name: "context with response metadata value, but nil metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
},
{
name: "basic metadata append",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Metadata{
"key1": []string{"val1"},
},
}),
},
{
name: "multiple values for same key",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Pairs("key1", "val1", "key1", "val2"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Metadata{
"key1": []string{"val1", "val2"},
},
}),
},
{
name: "multiple values for different keys",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Pairs("key1", "val1", "key1", "val2", "key2", "val3", "key2", "val4", "key3", "val5"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Metadata{
"key1": []string{"val1", "val2"},
"key2": []string{"val3", "val4"},
"key3": []string{"val5"},
},
}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
AppendResponseMetadata(tt.ctx, tt.md)
require.Equal(t, tt.expected, tt.ctx)
})
}
}
func TestGetResponseMetadata(t *testing.T) {
tests := []struct {
name string
ctx context.Context
expected metadata.Metadata
}{
{
name: "context without response metadata key",
ctx: context.Background(),
expected: nil,
},
{
name: "context with nil response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, nil),
expected: nil,
},
{
name: "context with incorrect type in response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &struct{}{}),
expected: nil,
},
{
name: "context with response metadata value, but nil metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
expected: nil,
},
{
name: "valid metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Pairs("key1", "value1"),
}),
expected: metadata.Metadata{"key1": {"value1"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, getResponseMetadata(tt.ctx))
})
}
}

View File

@@ -3,7 +3,7 @@ package grpc
import (
"context"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/server"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
)

View File

@@ -1,489 +0,0 @@
//go:build ignore
// +build ignore
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
Package reflection implements server reflection service.
The service implemented is defined in:
https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto.
To register server reflection on a gRPC server:
import "google.golang.org/grpc/reflection"
s := grpc.NewServer()
pb.RegisterYourOwnServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
s.Serve(lis)
*/
package grpc
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"reflect"
"sort"
"sync"
"github.com/golang/protobuf/proto"
dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
)
type serverReflectionServer struct {
rpb.UnimplementedServerReflectionServer
s *grpc.Server
initSymbols sync.Once
serviceNames []string
symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files
}
// Register registers the server reflection service on the given gRPC server.
func Register(s *grpc.Server) {
rpb.RegisterServerReflectionServer(s, &serverReflectionServer{
s: s,
})
}
// protoMessage is used for type assertion on proto messages.
// Generated proto message implements function Descriptor(), but Descriptor()
// is not part of interface proto.Message. This interface is needed to
// call Descriptor().
type protoMessage interface {
Descriptor() ([]byte, []int)
}
func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) {
s.initSymbols.Do(func() {
serviceInfo := s.s.GetServiceInfo()
s.symbols = map[string]*dpb.FileDescriptorProto{}
s.serviceNames = make([]string, 0, len(serviceInfo))
processed := map[string]struct{}{}
for svc, info := range serviceInfo {
s.serviceNames = append(s.serviceNames, svc)
fdenc, ok := parseMetadata(info.Metadata)
if !ok {
continue
}
fd, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
s.processFile(fd, processed)
}
sort.Strings(s.serviceNames)
})
return s.serviceNames, s.symbols
}
func (s *serverReflectionServer) processFile(fd *dpb.FileDescriptorProto, processed map[string]struct{}) {
filename := fd.GetName()
if _, ok := processed[filename]; ok {
return
}
processed[filename] = struct{}{}
prefix := fd.GetPackage()
for _, msg := range fd.MessageType {
s.processMessage(fd, prefix, msg)
}
for _, en := range fd.EnumType {
s.processEnum(fd, prefix, en)
}
for _, ext := range fd.Extension {
s.processField(fd, prefix, ext)
}
for _, svc := range fd.Service {
svcName := fqn(prefix, svc.GetName())
s.symbols[svcName] = fd
for _, meth := range svc.Method {
name := fqn(svcName, meth.GetName())
s.symbols[name] = fd
}
}
for _, dep := range fd.Dependency {
fdenc := proto.FileDescriptor(dep)
fdDep, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
s.processFile(fdDep, processed)
}
}
func (s *serverReflectionServer) processMessage(fd *dpb.FileDescriptorProto, prefix string, msg *dpb.DescriptorProto) {
msgName := fqn(prefix, msg.GetName())
s.symbols[msgName] = fd
for _, nested := range msg.NestedType {
s.processMessage(fd, msgName, nested)
}
for _, en := range msg.EnumType {
s.processEnum(fd, msgName, en)
}
for _, ext := range msg.Extension {
s.processField(fd, msgName, ext)
}
for _, fld := range msg.Field {
s.processField(fd, msgName, fld)
}
for _, oneof := range msg.OneofDecl {
oneofName := fqn(msgName, oneof.GetName())
s.symbols[oneofName] = fd
}
}
func (s *serverReflectionServer) processEnum(fd *dpb.FileDescriptorProto, prefix string, en *dpb.EnumDescriptorProto) {
enName := fqn(prefix, en.GetName())
s.symbols[enName] = fd
for _, val := range en.Value {
valName := fqn(enName, val.GetName())
s.symbols[valName] = fd
}
}
func (s *serverReflectionServer) processField(fd *dpb.FileDescriptorProto, prefix string, fld *dpb.FieldDescriptorProto) {
fldName := fqn(prefix, fld.GetName())
s.symbols[fldName] = fd
}
func fqn(prefix, name string) string {
if prefix == "" {
return name
}
return prefix + "." + name
}
// fileDescForType gets the file descriptor for the given type.
// The given type should be a proto message.
func (s *serverReflectionServer) fileDescForType(st reflect.Type) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(protoMessage)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
enc, _ := m.Descriptor()
return decodeFileDesc(enc)
}
// decodeFileDesc does decompression and unmarshalling on the given
// file descriptor byte slice.
func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) {
raw, err := decompress(enc)
if err != nil {
return nil, fmt.Errorf("failed to decompress enc: %v", err)
}
fd := new(dpb.FileDescriptorProto)
if err := proto.Unmarshal(raw, fd); err != nil {
return nil, fmt.Errorf("bad descriptor: %v", err)
}
return fd, nil
}
// decompress does gzip decompression.
func decompress(b []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
}
return out, nil
}
func typeForName(name string) (reflect.Type, error) {
pt := proto.MessageType(name)
if pt == nil {
return nil, fmt.Errorf("unknown type: %q", name)
}
st := pt.Elem()
return st, nil
}
func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
var extDesc *proto.ExtensionDesc
for id, desc := range proto.RegisteredExtensions(m) {
if id == ext {
extDesc = desc
break
}
}
if extDesc == nil {
return nil, fmt.Errorf("failed to find registered extension for extension number %v", ext)
}
return decodeFileDesc(proto.FileDescriptor(extDesc.Filename))
}
func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
exts := proto.RegisteredExtensions(m)
out := make([]int32, 0, len(exts))
for id := range exts {
out = append(out, id)
}
return out, nil
}
// fileDescWithDependencies returns a slice of serialized fileDescriptors in
// wire format ([]byte). The fileDescriptors will include fd and all the
// transitive dependencies of fd with names not in sentFileDescriptors.
func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors map[string]bool) ([][]byte, error) {
r := [][]byte{}
queue := []*dpb.FileDescriptorProto{fd}
for len(queue) > 0 {
currentfd := queue[0]
queue = queue[1:]
if sent := sentFileDescriptors[currentfd.GetName()]; len(r) == 0 || !sent {
sentFileDescriptors[currentfd.GetName()] = true
currentfdEncoded, err := proto.Marshal(currentfd)
if err != nil {
return nil, err
}
r = append(r, currentfdEncoded)
}
for _, dep := range currentfd.Dependency {
fdenc := proto.FileDescriptor(dep)
fdDep, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
queue = append(queue, fdDep)
}
}
return r, nil
}
// fileDescEncodingByFilename finds the file descriptor for given filename,
// finds all of its previously unsent transitive dependencies, does marshalling
// on them, and returns the marshalled result.
func (s *serverReflectionServer) fileDescEncodingByFilename(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
enc := proto.FileDescriptor(name)
if enc == nil {
return nil, fmt.Errorf("unknown file: %v", name)
}
fd, err := decodeFileDesc(enc)
if err != nil {
return nil, err
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// parseMetadata finds the file descriptor bytes specified meta.
// For SupportPackageIsVersion4, m is the name of the proto file, we
// call proto.FileDescriptor to get the byte slice.
// For SupportPackageIsVersion3, m is a byte slice itself.
func parseMetadata(meta interface{}) ([]byte, bool) {
// Check if meta is the file name.
if fileNameForMeta, ok := meta.(string); ok {
return proto.FileDescriptor(fileNameForMeta), true
}
// Check if meta is the byte slice.
if enc, ok := meta.([]byte); ok {
return enc, true
}
return nil, false
}
// fileDescEncodingContainingSymbol finds the file descriptor containing the
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result. The given symbol
// can be a type, a service or a method.
func (s *serverReflectionServer) fileDescEncodingContainingSymbol(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
_, symbols := s.getSymbols()
fd := symbols[name]
if fd == nil {
// Check if it's a type name that was not present in the
// transitive dependencies of the registered services.
if st, err := typeForName(name); err == nil {
fd, err = s.fileDescForType(st)
if err != nil {
return nil, err
}
}
}
if fd == nil {
return nil, fmt.Errorf("unknown symbol: %v", name)
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// fileDescEncodingContainingExtension finds the file descriptor containing
// given extension, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result.
func (s *serverReflectionServer) fileDescEncodingContainingExtension(typeName string, extNum int32, sentFileDescriptors map[string]bool) ([][]byte, error) {
st, err := typeForName(typeName)
if err != nil {
return nil, err
}
fd, err := fileDescContainingExtension(st, extNum)
if err != nil {
return nil, err
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// allExtensionNumbersForTypeName returns all extension numbers for the given type.
func (s *serverReflectionServer) allExtensionNumbersForTypeName(name string) ([]int32, error) {
st, err := typeForName(name)
if err != nil {
return nil, err
}
extNums, err := s.allExtensionNumbersForType(st)
if err != nil {
return nil, err
}
return extNums, nil
}
// ServerReflectionInfo is the reflection service handler.
func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflection_ServerReflectionInfoServer) error {
sentFileDescriptors := make(map[string]bool)
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
out := &rpb.ServerReflectionResponse{
ValidHost: in.Host,
OriginalRequest: in,
}
switch req := in.MessageRequest.(type) {
case *rpb.ServerReflectionRequest_FileByFilename:
b, err := s.fileDescEncodingByFilename(req.FileByFilename, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_FileContainingSymbol:
b, err := s.fileDescEncodingContainingSymbol(req.FileContainingSymbol, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_FileContainingExtension:
typeName := req.FileContainingExtension.ContainingType
extNum := req.FileContainingExtension.ExtensionNumber
b, err := s.fileDescEncodingContainingExtension(typeName, extNum, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_AllExtensionNumbersOfType:
extNums, err := s.allExtensionNumbersForTypeName(req.AllExtensionNumbersOfType)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &rpb.ExtensionNumberResponse{
BaseTypeName: req.AllExtensionNumbersOfType,
ExtensionNumber: extNums,
},
}
}
case *rpb.ServerReflectionRequest_ListServices:
svcNames, _ := s.getSymbols()
serviceResponses := make([]*rpb.ServiceResponse, len(svcNames))
for i, n := range svcNames {
serviceResponses[i] = &rpb.ServiceResponse{
Name: n,
}
}
out.MessageResponse = &rpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &rpb.ListServiceResponse{
Service: serviceResponses,
},
}
default:
return status.Errorf(codes.InvalidArgument, "invalid MessageRequest: %v", in.MessageRequest)
}
if err := stream.Send(out); err != nil {
return err
}
}
}

View File

@@ -4,8 +4,7 @@ import (
"fmt"
"testing"
_ "go.unistack.org/micro-server-grpc/v3/proto"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/server"
"google.golang.org/grpc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
@@ -50,6 +49,7 @@ func (r *reflector) RangeExtensionsByMessage(message protoreflect.FullName, f fu
}
func TestReflector(t *testing.T) {
t.Skip()
srv := NewServer(Reflection(&reflector{}), server.Address(":12345"))
if err := srv.Init(); err != nil {
t.Fatal(err)

View File

@@ -1,17 +1,14 @@
package grpc
import (
"io"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
)
var _ server.Request = &rpcRequest{}
type rpcRequest struct {
rw io.ReadWriter
payload interface{}
codec codec.Codec
header metadata.Metadata

View File

@@ -1,17 +1,14 @@
package grpc
import (
"io"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/server"
)
var _ server.Response = &rpcResponse{}
type rpcResponse struct {
rw io.ReadWriter
header metadata.Metadata
codec codec.Codec
}

View File

@@ -14,7 +14,7 @@ import (
"unicode"
"unicode/utf8"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/server"
)
// Precompute the reflect type for error. Can't use error directly
@@ -47,8 +47,8 @@ type rServer struct {
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(r)
}
// Is this type exported or a builtin?
@@ -123,43 +123,43 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
}
func (server *rServer) register(rcvr interface{}) error {
server.mu.Lock()
defer server.mu.Unlock()
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
func (s *rServer) register(rcvr interface{}) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.serviceMap == nil {
s.serviceMap = make(map[string]*service)
}
s := &service{}
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
srv := &service{}
srv.typ = reflect.TypeOf(rcvr)
srv.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(srv.rcvr).Type().Name()
if sname == "" {
return fmt.Errorf("rpc: no service name for type %v", s.typ.String())
return fmt.Errorf("rpc: no service name for type %v", srv.typ.String())
}
if !isExported(sname) {
return fmt.Errorf("rpc Register: type %s is not exported", sname)
}
if _, present := server.serviceMap[sname]; present {
return fmt.Errorf("rpc: service already defined: " + sname)
if _, present := s.serviceMap[sname]; present {
return fmt.Errorf("rpc: service already defined: %s", sname)
}
s.name = sname
s.method = make(map[string]*methodType)
srv.name = sname
srv.method = make(map[string]*methodType)
// Install the methods
for m := 0; m < s.typ.NumMethod(); m++ {
method := s.typ.Method(m)
for m := 0; m < srv.typ.NumMethod(); m++ {
method := srv.typ.Method(m)
mt, err := prepareEndpoint(method)
if mt != nil && err == nil {
s.method[method.Name] = mt
srv.method[method.Name] = mt
} else if err != nil {
return err
}
}
if len(s.method) == 0 {
if len(srv.method) == 0 {
return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname)
}
server.serviceMap[s.name] = s
s.serviceMap[srv.name] = srv
return nil
}

View File

@@ -3,7 +3,7 @@ package grpc
import (
"context"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v4/server"
"google.golang.org/grpc"
)

View File

@@ -1,285 +0,0 @@
package grpc
import (
"context"
"fmt"
"reflect"
"strings"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
)
var _ server.Message = &rpcMessage{}
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*register.Endpoint
opts server.SubscriberOptions
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.NewSubscriberOptions(opts...)
var endpoints []*register.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &register.Endpoint{
Name: "Func",
Request: register.ExtractSubValue(typ),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &register.Endpoint{
Name: name + "." + method.Name,
Request: register.ExtractSubValue(method.Type),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = make(map[string]string)
}
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType
}
cf, err := g.newCodec(ct)
if err != nil {
return err
}
hdr := make(map[string]string, len(msg.Header))
for k, v := range msg.Header {
hdr[k] = v
}
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookSubHandler); ok {
fn = h(fn)
}
})
if g.wg != nil {
g.wg.Add(1)
}
go func() {
if g.wg != nil {
defer g.wg.Done()
}
cerr := fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
})
results <- cerr
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*register.Endpoint {
return s.endpoints
}
func (s *subscriber) Options() server.SubscriberOptions {
return s.opts
}
func (g *Server) subscribe() error {
config := g.opts
subCtx := config.Context
for sb := range g.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts := []broker.SubscribeOption{
broker.SubscribeContext(subCtx),
broker.SubscribeAutoAck(sb.Options().AutoAck),
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "subscribing to topic: "+sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), g.createSubHandler(sb, config), opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}