Compare commits

...

25 Commits
v3.10.28 ... v4

Author SHA1 Message Date
vtolstov
c4d90c8a2b Apply Code Coverage Badge 2025-06-08 14:11:25 +00:00
2b3c413adc fixup grpc error codes in unary and stream processing
All checks were successful
sync / sync (push) Successful in 26s
coverage / build (push) Successful in 1m51s
test / test (push) Successful in 2m40s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-06-08 11:16:43 +03:00
20fb19fee9 changed embedded mutex to private field (#278)
Some checks failed
coverage / build (push) Successful in 2m26s
test / test (push) Failing after 16m18s
sync / sync (push) Successful in 8s
2025-05-14 01:25:57 +03:00
8e3c56f4ed update ci (#277)
All checks were successful
sync / sync (push) Successful in 10s
2025-05-05 19:19:33 +03:00
fcbae6f94a added commit hash check to avoid unnecessary repository cloning (#275)
All checks were successful
sync / sync (push) Successful in 10s
2025-05-05 13:44:43 +03:00
2f818d389b Merge branch 'v4' of https://git.unistack.org/unistack-org/micro-server-grpc into v4
All checks were successful
sync / sync (push) Successful in 14s
2025-05-04 15:02:46 +03:00
d55cb59531 fixup sync
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-05-04 15:02:02 +03:00
8c416b10ef Обновить .github/workflows/job_sync.yml
Some checks failed
sync / sync (push) Has been cancelled
2025-05-02 18:55:11 +03:00
d3c2ae5f54 convert headers to HTTP/2 format before SendHeader() (#273)
All checks were successful
coverage / build (push) Has been skipped
test / test (push) Successful in 2m19s
2025-05-02 18:51:30 +03:00
3bb8ec0753 Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Has been skipped
2025-05-01 22:07:43 +03:00
27bca35eb6 Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Has been skipped
2025-05-01 22:01:33 +03:00
498218912c Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Successful in 23s
2025-05-01 21:59:18 +03:00
bd04f5b9cb Обновить .github/workflows/job_sync.yml 2025-05-01 21:58:56 +03:00
dc976006ad Обновить .github/workflows/job_sync.yml
All checks were successful
sync / sync (push) Has been skipped
2025-05-01 21:05:02 +03:00
934ebf6c0a fix uninitialized response metadata for incoming context (#264)
All checks were successful
coverage / build (push) Has been skipped
test / test (push) Successful in 2m8s
sync / sync (push) Successful in 18s
2025-05-01 20:42:59 +03:00
6d8fce53dd update ci (#265) 2025-05-01 20:42:52 +03:00
f12f3fb2c2 Обновить .github/workflows/job_coverage.yml
All checks were successful
sync / sync (push) Successful in 15s
2025-05-01 20:34:33 +03:00
vtolstov
5a755437c9 Apply Code Coverage Badge 2025-04-29 15:43:08 +00:00
05db1f3dae [v4] fix ci pipeline (#260)
All checks were successful
coverage / build (push) Successful in 1m54s
test / test (push) Successful in 2m42s
* attempt to fix coverage job

* Apply Code Coverage Badge

---------

Co-authored-by: pugnack <pugnack@users.noreply.github.com>
2025-04-29 18:39:27 +03:00
e4ba134fa6 [v4] breaking change: modify API for working with response metadata (#255)
Some checks failed
coverage / build (push) Failing after 40s
test / test (push) Successful in 3m3s
sync / sync (push) Successful in 17s
* implement functions to append/get metadata

* сhanged behavior to return nil instead of empty metadata for getResponseMetadata()

* removed metadata copy when passing to gRPC headers

---------

Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 18:34:11 +03:00
8a85989b79 update all
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 18:30:08 +03:00
56185faabe fixup coverage job
All checks were successful
sync / sync (push) Successful in 26s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 14:16:34 +03:00
823b2bdc52 fixup for latest micro
Some checks failed
coverage / build (push) Failing after 1m21s
test / test (push) Successful in 4m8s
sync / sync (push) Failing after 8s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-29 14:08:59 +03:00
48906c5612 improve sync
Some checks failed
coverage / build (push) Failing after 5m36s
test / test (push) Successful in 11m32s
sync / sync (push) Failing after 41s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 21:38:23 +03:00
23a1ff424e prepare v4 (need swap target!) (#195)
All checks were successful
test / test (push) Successful in 1m53s
move to v4 micro

Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Reviewed-on: #195
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
2025-03-02 21:13:25 +03:00
29 changed files with 569 additions and 579 deletions

View File

@@ -1,14 +0,0 @@
---
name: Question
about: Ask a question about go-micro
title: ''
labels: ''
assignees: ''
---
Before asking, please check if your question has already been answered:
1. Check the documentation - https://micro.mu/docs/
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
3. Search existing issues

View File

@@ -1,6 +1,6 @@
--- ---
name: Bug report name: Bug report
about: For reporting bugs in go-micro about: For reporting bugs in micro
title: "[BUG]" title: "[BUG]"
labels: '' labels: ''
assignees: '' assignees: ''
@@ -16,9 +16,3 @@ assignees: ''
**How to reproduce the bug:** **How to reproduce the bug:**
If possible, please include a minimal code snippet here. If possible, please include a minimal code snippet here.
**Environment:**
Go Version: please paste `go version` output here
```
please paste `go env` output here
```

View File

@@ -1,6 +1,6 @@
--- ---
name: Feature request / Enhancement name: Feature request / Enhancement
about: If you have a need not served by go-micro about: If you have a need not served by micro
title: "[FEATURE]" title: "[FEATURE]"
labels: '' labels: ''
assignees: '' assignees: ''

8
.github/ISSUE_TEMPLATE/question.md vendored Normal file
View File

@@ -0,0 +1,8 @@
---
name: Question
about: Ask a question about micro
title: ''
labels: ''
assignees: ''
---

28
.github/autoapprove.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
workflow_run:
workflows: ["prbuild"]
types:
- completed
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
"chmod +x ./tea",
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
"./tea pr --repo ${{ github.event.repository.name }}"
]
if: github.actor == 'vtolstov'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

53
.github/workflows/job_coverage.yml vendored Normal file
View File

@@ -0,0 +1,53 @@
name: coverage
on:
push:
branches: [ main, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
pull_request:
branches: [ main, v3, v4 ]
jobs:
build:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: test coverage
run: |
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./...
go tool cover -func coverage.out -o coverage.out
- name: coverage badge
uses: tj-actions/coverage-badge-go@v2
with:
green: 80
filename: coverage.out
- uses: stefanzweifel/git-auto-commit-action@v4
name: autocommit
with:
commit_message: Apply Code Coverage Badge
skip_fetch: false
skip_checkout: false
file_pattern: ./README.md
- name: push
if: steps.auto-commit-action.outputs.changes_detected == 'true'
uses: ad-m/github-push-action@master
with:
github_token: ${{ github.token }}
branch: ${{ github.ref }}

View File

@@ -3,10 +3,10 @@ name: lint
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize] types: [opened, reopened, synchronize]
branches: branches: [ master, v3, v4 ]
- master paths-ignore:
- v3 - '.github/**'
- v4 - '.gitea/**'
jobs: jobs:
lint: lint:
@@ -24,6 +24,6 @@ jobs:
- name: setup deps - name: setup deps
run: go get -v ./... run: go get -v ./...
- name: run lint - name: run lint
uses: https://github.com/golangci/golangci-lint-action@v6 uses: golangci/golangci-lint-action@v6
with: with:
version: 'latest' version: 'latest'

94
.github/workflows/job_sync.yml vendored Normal file
View File

@@ -0,0 +1,94 @@
name: sync
on:
schedule:
- cron: '*/5 * * * *'
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
sync:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: init
run: |
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
git config --global user.name "github-actions[bot]"
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
- name: check master
id: check_master
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync master
if: steps.check_master.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream master
git push upstream master --progress
git push origin master --progress
cd ../
rm -rf repo
- name: check v3
id: check_v3
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v3
if: steps.check_v3.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v3
git push upstream v3 --progress
git push origin v3 --progress
cd ../
rm -rf repo
- name: check v4
id: check_v4
run: |
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
echo "src_hash=$src_hash"
echo "dst_hash=$dst_hash"
if [ "$src_hash" != "$dst_hash" ]; then
echo "sync_needed=true" >> $GITHUB_OUTPUT
else
echo "sync_needed=false" >> $GITHUB_OUTPUT
fi
- name: sync v4
if: steps.check_v4.outputs.sync_needed == 'true'
run: |
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git pull --rebase upstream v4
git push upstream v4 --progress
git push origin v4 --progress
cd ../
rm -rf repo

View File

@@ -3,15 +3,12 @@ name: test
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize] types: [opened, reopened, synchronize]
branches: branches: [ master, v3, v4 ]
- master
- v3
- v4
push: push:
branches: branches: [ master, v3, v4 ]
- master paths-ignore:
- v3 - '.github/**'
- v4 - '.gitea/**'
jobs: jobs:
test: test:

View File

@@ -3,15 +3,12 @@ name: test
on: on:
pull_request: pull_request:
types: [opened, reopened, synchronize] types: [opened, reopened, synchronize]
branches: branches: [ master, v3, v4 ]
- master
- v3
- v4
push: push:
branches: branches: [ master, v3, v4 ]
- master paths-ignore:
- v3 - '.github/**'
- v4 - '.gitea/**'
jobs: jobs:
test: test:
@@ -35,19 +32,19 @@ jobs:
go-version: 'stable' go-version: 'stable'
- name: setup go work - name: setup go work
env: env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work GOWORK: ${{ github.workspace }}/go.work
run: | run: |
go work init go work init
go work use . go work use .
go work use micro-tests go work use micro-tests
- name: setup deps - name: setup deps
env: env:
GOWORK: /workspace/${{ github.repository_owner }}/go.work GOWORK: ${{ github.workspace }}/go.work
run: go get -v ./... run: go get -v ./...
- name: run tests - name: run tests
env: env:
INTEGRATION_TESTS: yes INTEGRATION_TESTS: yes
GOWORK: /workspace/${{ github.repository_owner }}/go.work GOWORK: ${{ github.workspace }}/go.work
run: | run: |
cd micro-tests cd micro-tests
go test -mod readonly -v ./... || true go test -mod readonly -v ./... || true

View File

@@ -1,5 +1,5 @@
run: run:
concurrency: 8 concurrency: 8
deadline: 5m timeout: 5m
issues-exit-code: 1 issues-exit-code: 1
tests: true tests: true

4
README.md Normal file
View File

@@ -0,0 +1,4 @@
# GRPC Server
![Coverage](https://img.shields.io/badge/Coverage-3.4%25-red)
This plugin is a grpc server for micro.

View File

@@ -1,7 +1,7 @@
package grpc package grpc
import ( import (
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )

View File

@@ -6,7 +6,7 @@ import (
"net/http" "net/http"
"os" "os"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v4/errors"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
) )

View File

@@ -1,4 +1,4 @@
package grpc package grpc
//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest //go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto" //go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v4) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto"

26
go.mod
View File

@@ -1,23 +1,27 @@
module go.unistack.org/micro-server-grpc/v3 module go.unistack.org/micro-server-grpc/v4
go 1.22.0 go 1.23.0
toolchain go1.24.2
require ( require (
go.unistack.org/micro/v3 v3.11.30 github.com/stretchr/testify v1.10.0
golang.org/x/net v0.33.0 go.unistack.org/micro/v4 v4.1.8
google.golang.org/grpc v1.69.2 golang.org/x/net v0.39.0
google.golang.org/protobuf v1.36.1 google.golang.org/grpc v1.72.0
google.golang.org/protobuf v1.36.6
) )
require ( require (
github.com/ash3in/uuidv8 v1.2.0 // indirect github.com/ash3in/uuidv8 v1.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/matoous/go-nanoid v1.5.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect
go.unistack.org/micro-proto/v3 v3.4.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
golang.org/x/sys v0.28.0 // indirect github.com/spf13/cast v1.7.1 // indirect
golang.org/x/text v0.21.0 // indirect go.unistack.org/micro-proto/v4 v4.1.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

67
go.sum
View File

@@ -2,7 +2,10 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
@@ -19,36 +22,42 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4= github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U= github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.unistack.org/micro/v3 v3.11.30 h1:XTLgZubSGzQL85IUMp1pTJnS1lP4eFwTZyelV/SzOMc= go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.unistack.org/micro/v3 v3.11.30/go.mod h1:fvOkXKs3wKHToWH6Mxy+aovEiDl2q4UlOCdVfJdziBU= go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= go.unistack.org/micro/v4 v4.1.8 h1:bbqnQ7nzTYNIYt4HhRgmh/++qzwGJvlNbrTOoXzkQiU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= go.unistack.org/micro/v4 v4.1.8/go.mod h1:b4dr7RFlbpSfSsKCva9UuX4zLtkYQteEK+Uuac39qJE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

280
grpc.go
View File

@@ -10,7 +10,6 @@ import (
"slices" "slices"
"sort" "sort"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -19,18 +18,16 @@ import (
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1" reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
// nolint: staticcheck // nolint: staticcheck
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/meter"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v3/options" "go.unistack.org/micro/v4/register"
"go.unistack.org/micro/v3/register" "go.unistack.org/micro/v4/semconv"
"go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/tracer"
msync "go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@@ -46,13 +43,6 @@ const (
DefaultContentType = "application/grpc" DefaultContentType = "application/grpc"
) )
/*
type ServerReflection struct {
srv *grpc.Server
s *serverReflectionServer
}
*/
type streamWrapper struct { type streamWrapper struct {
ctx context.Context ctx context.Context
grpc.ServerStream grpc.ServerStream
@@ -69,18 +59,16 @@ type Server struct {
handlers map[string]server.Handler handlers map[string]server.Handler
srv *grpc.Server srv *grpc.Server
exit chan chan error exit chan chan error
wg *msync.WaitGroup
rsvc *register.Service rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer rpc *rServer
opts server.Options opts server.Options
unknownHandler grpc.StreamHandler unknownHandler grpc.StreamHandler
sync.RWMutex mu sync.RWMutex
stateLive *atomic.Uint32 stateLive *atomic.Uint32
stateReady *atomic.Uint32 stateReady *atomic.Uint32
stateHealth *atomic.Uint32 stateHealth *atomic.Uint32
started bool started bool
registered bool registered bool
// reflection bool // reflection bool
} }
@@ -92,7 +80,6 @@ func newServer(opts ...server.Option) *Server {
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
}, },
handlers: make(map[string]server.Handler), handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error), exit: make(chan chan error),
stateLive: &atomic.Uint32{}, stateLive: &atomic.Uint32{},
stateReady: &atomic.Uint32{}, stateReady: &atomic.Uint32{},
@@ -104,25 +91,9 @@ func newServer(opts ...server.Option) *Server {
return g return g
} }
/*
type grpcRouter struct {
h func(context.Context, server.Request, interface{}) error
m func(context.Context, server.Message) error
}
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
return r.m(ctx, msg)
}
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
return r.h(ctx, req, rsp)
}
*/
func (g *Server) configure(opts ...server.Option) error { func (g *Server) configure(opts ...server.Option) error {
g.Lock() g.mu.Lock()
defer g.Unlock() defer g.mu.Unlock()
for _, o := range opts { for _, o := range opts {
o(&g.opts) o(&g.opts)
@@ -215,8 +186,9 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
return status.Errorf(codes.Internal, "method does not exist in context") return status.Errorf(codes.Internal, "method does not exist in context")
} }
var gmd map[string][]string
// get grpc metadata // get grpc metadata
gmd, ok := gmetadata.FromIncomingContext(ctx) gmd, ok = gmetadata.FromIncomingContext(ctx)
if !ok { if !ok {
gmd = gmetadata.MD{} gmd = gmetadata.MD{}
} }
@@ -249,25 +221,23 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
}() }()
} }
md := metadata.New(len(gmd)) md := metadata.Copy(gmd)
for k, v := range gmd {
md[k] = strings.Join(v, ", ") md.Set("path", fullMethod)
} md.Set("micro-server", "grpc")
md.Set("Path", fullMethod)
md.Set("Micro-Server", "grpc")
md.Set(metadata.HeaderEndpoint, methodName) md.Set(metadata.HeaderEndpoint, methodName)
md.Set(metadata.HeaderService, serviceName) md.Set(metadata.HeaderService, serviceName)
var td string var td string
// timeout for server deadline // timeout for server deadline
if v, ok := md.Get("timeout"); ok { if v := md.Get("timeout"); v != nil {
md.Del("timeout") md.Del("timeout")
td = v td = v[0]
} }
if v, ok := md.Get("Grpc-Timeout"); ok { if v := md.Get("grpc-timeout"); v != nil {
md.Del("Grpc-Timeout") md.Del("grpc-timeout")
td = v[:len(v)-1] td = v[0][:len(v)-1]
switch v[len(v)-1:] { switch v[0][len(v)-1:] {
case "S": case "S":
td += "s" td += "s"
case "M": case "M":
@@ -286,16 +256,14 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get content type // get content type
ct := DefaultContentType ct := DefaultContentType
if ctype, ok := md.Get("content-type"); ok { if ctype := md.Get("content-type"); ctype != nil {
ct = ctype ct = ctype[0]
} else if ctype, ok := md.Get("x-content-type"); ok {
ct = ctype
md.Del("x-content-type")
} }
// create new context // create new context
ctx = metadata.NewIncomingContext(ctx, md) ctx = metadata.NewIncomingContext(ctx, md)
ctx = metadata.NewOutgoingContext(ctx, metadata.New(0)) ctx = metadata.NewOutgoingContext(ctx, metadata.New(0))
ctx = context.WithValue(ctx, rspMetadataKey{}, &rspMetadataVal{m: metadata.New(0)})
stream = &streamWrapper{ctx, stream} stream = &streamWrapper{ctx, stream}
@@ -323,7 +291,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get peer from context // get peer from context
if p, ok := peer.FromContext(ctx); ok { if p, ok := peer.FromContext(ctx); ok {
md.Set("Remote", p.Addr.String()) md.Set("remote", p.Addr.String())
ctx = peer.NewContext(ctx, p) ctx = peer.NewContext(ctx, p)
} }
@@ -430,13 +398,22 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
statusDesc := "" statusDesc := ""
// execute the handler // execute the handler
appErr := fn(ctx, r, replyv.Interface()) appErr := fn(ctx, r, replyv.Interface())
if outmd, ok := metadata.FromOutgoingContext(ctx); ok { if md := getResponseMetadata(ctx); len(md) > 0 {
if err = stream.SendHeader(gmetadata.New(outmd)); err != nil { if err = stream.SendHeader(md.AsHTTP2()); err != nil {
return err return err
} }
} }
if appErr != nil { if appErr != nil {
var err error
var errStatus *status.Status var errStatus *status.Status
var ok bool
errStatus, ok = status.FromError(appErr)
if ok {
return errStatus.Err()
}
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
return errStatus.Err()
}
switch verr := appErr.(type) { switch verr := appErr.(type) {
case *errors.Error: case *errors.Error:
statusCode = microError(verr) statusCode = microError(verr)
@@ -450,12 +427,10 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
if err != nil { if err != nil {
return err return err
} }
case (interface{ GRPCStatus() *status.Status }):
errStatus = verr.GRPCStatus()
default: default:
g.RLock() g.mu.RLock()
config := g.opts config := g.opts
g.RUnlock() g.mu.RUnlock()
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message") config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
} }
@@ -514,14 +489,22 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
statusDesc := "" statusDesc := ""
appErr := fn(ctx, r, ss) appErr := fn(ctx, r, ss)
if outmd, ok := metadata.FromOutgoingContext(ctx); ok { if md := getResponseMetadata(ctx); len(md) > 0 {
if err := stream.SendHeader(gmetadata.New(outmd)); err != nil { if err := stream.SendHeader(md.AsHTTP2()); err != nil {
return err return err
} }
} }
if appErr != nil { if appErr != nil {
var err error var err error
var errStatus *status.Status var errStatus *status.Status
var ok bool
errStatus, ok = status.FromError(appErr)
if ok {
return errStatus.Err()
}
if errStatus = status.FromContextError(appErr); errStatus.Code() != codes.Unknown {
return errStatus.Err()
}
switch verr := appErr.(type) { switch verr := appErr.(type) {
case *errors.Error: case *errors.Error:
statusCode = microError(verr) statusCode = microError(verr)
@@ -551,25 +534,10 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
return status.New(statusCode, statusDesc).Err() return status.New(statusCode, statusDesc).Err()
} }
func (g *Server) newCodec(ct string) (codec.Codec, error) {
g.RLock()
defer g.RUnlock()
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
ct = ct[:idx]
}
if c, ok := g.opts.Codecs[ct]; ok {
return c, nil
}
return nil, codec.ErrUnknownContentType
}
func (g *Server) Options() server.Options { func (g *Server) Options() server.Options {
g.RLock() g.mu.RLock()
opts := g.opts opts := g.opts
g.RUnlock() g.mu.RUnlock()
return opts return opts
} }
@@ -591,39 +559,11 @@ func (g *Server) Handle(h server.Handler) error {
return nil return nil
} }
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (g *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := server.ValidateSubscriber(sb); err != nil {
return err
}
g.Lock()
if _, ok = g.subscribers[sub]; ok {
g.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
g.subscribers[sub] = nil
g.Unlock()
return nil
}
func (g *Server) Register() error { func (g *Server) Register() error {
g.RLock() g.mu.RLock()
rsvc := g.rsvc rsvc := g.rsvc
config := g.opts config := g.opts
g.RUnlock() g.mu.RUnlock()
// if service already filled, reuse it and return early // if service already filled, reuse it and return early
if rsvc != nil { if rsvc != nil {
@@ -638,7 +578,7 @@ func (g *Server) Register() error {
return err return err
} }
g.RLock() g.mu.RLock()
// Maps are ordered randomly, sort the keys for consistency // Maps are ordered randomly, sort the keys for consistency
handlerList := make([]string, 0, len(g.handlers)) handlerList := make([]string, 0, len(g.handlers))
for n := range g.handlers { for n := range g.handlers {
@@ -648,20 +588,11 @@ func (g *Server) Register() error {
sort.Strings(handlerList) sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(g.subscribers)) g.mu.RUnlock()
for e := range g.subscribers {
// Only advertise non internal subscribers
subscriberList = append(subscriberList, e)
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
g.RUnlock() g.mu.RLock()
g.RLock()
registered := g.registered registered := g.registered
g.RUnlock() g.mu.RUnlock()
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
@@ -679,8 +610,8 @@ func (g *Server) Register() error {
return nil return nil
} }
g.Lock() g.mu.Lock()
defer g.Unlock() defer g.mu.Unlock()
g.registered = true g.registered = true
g.rsvc = service g.rsvc = service
@@ -691,9 +622,9 @@ func (g *Server) Register() error {
func (g *Server) Deregister() error { func (g *Server) Deregister() error {
var err error var err error
g.RLock() g.mu.RLock()
config := g.opts config := g.opts
g.RUnlock() g.mu.RUnlock()
service, err := server.NewRegisterService(g) service, err := server.NewRegisterService(g)
if err != nil { if err != nil {
@@ -708,47 +639,27 @@ func (g *Server) Deregister() error {
return err return err
} }
g.Lock() g.mu.Lock()
g.rsvc = nil g.rsvc = nil
if !g.registered { if !g.registered {
g.Unlock() g.mu.Unlock()
return nil return nil
} }
g.registered = false g.registered = false
wg := sync.WaitGroup{} g.mu.Unlock()
for sb, subs := range g.subscribers {
for _, sub := range subs {
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(g.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err)
}
}
}(sub)
}
g.subscribers[sb] = nil
}
wg.Wait()
g.Unlock()
return nil return nil
} }
func (g *Server) Start() error { func (g *Server) Start() error {
g.RLock() g.mu.RLock()
if g.started { if g.started {
g.RUnlock() g.mu.RUnlock()
return nil return nil
} }
g.RUnlock() g.mu.RUnlock()
config := g.Options() config := g.Options()
@@ -778,27 +689,12 @@ func (g *Server) Start() error {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String()) config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
} }
g.Lock() g.mu.Lock()
g.opts.Address = ts.Addr().String() g.opts.Address = ts.Addr().String()
if len(g.opts.Advertise) == 0 { if len(g.opts.Advertise) == 0 {
g.opts.Advertise = ts.Addr().String() g.opts.Advertise = ts.Addr().String()
} }
g.Unlock() g.mu.Unlock()
// only connect if we're subscribed
if len(g.subscribers) > 0 {
// connect to the broker
if err = config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
// use RegisterCheck func before register // use RegisterCheck func before register
// nolint: nestif // nolint: nestif
@@ -815,10 +711,6 @@ func (g *Server) Start() error {
} }
} }
if err = g.subscribe(); err != nil {
return err
}
// micro: go ts.Accept(s.accept) // micro: go ts.Accept(s.accept)
go func() { go func() {
if err = g.srv.Serve(ts); err != nil { if err = g.srv.Serve(ts); err != nil {
@@ -853,9 +745,9 @@ func (g *Server) Start() error {
select { select {
// register self on interval // register self on interval
case <-t.C: case <-t.C:
g.RLock() g.mu.RLock()
registered := g.registered registered := g.registered
g.RUnlock() g.mu.RUnlock()
rerr := g.opts.RegisterCheck(g.opts.Context) rerr := g.opts.RegisterCheck(g.opts.Context)
// nolint: nestif // nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
@@ -932,29 +824,29 @@ func (g *Server) Start() error {
}() }()
// mark the server as started // mark the server as started
g.Lock() g.mu.Lock()
g.started = true g.started = true
g.Unlock() g.mu.Unlock()
return nil return nil
} }
func (g *Server) Stop() error { func (g *Server) Stop() error {
g.RLock() g.mu.RLock()
if !g.started { if !g.started {
g.RUnlock() g.mu.RUnlock()
return nil return nil
} }
g.RUnlock() g.mu.RUnlock()
ch := make(chan error) ch := make(chan error)
g.exit <- ch g.exit <- ch
err := <-ch err := <-ch
g.Lock() g.mu.Lock()
g.rsvc = nil g.rsvc = nil
g.started = false g.started = false
g.Unlock() g.mu.Unlock()
return err return err
} }

View File

@@ -3,7 +3,7 @@ package grpc
import ( import (
"reflect" "reflect"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
type rpcHandler struct { type rpcHandler struct {

47
metadata.go Normal file
View File

@@ -0,0 +1,47 @@
package grpc
import (
"context"
"go.unistack.org/micro/v4/metadata"
)
type (
rspMetadataKey struct{}
rspMetadataVal struct {
m metadata.Metadata
}
)
// AppendResponseMetadata adds metadata entries to metadata.Metadata stored in the context.
// It expects the context to contain a *rspMetadataVal value under the rspMetadataKey{} key.
// If the value is missing or invalid, the function does nothing.
//
// Note: this function is not thread-safe. Synchronization is required if used from multiple goroutines.
func AppendResponseMetadata(ctx context.Context, md metadata.Metadata) {
if md == nil {
return
}
val, ok := ctx.Value(rspMetadataKey{}).(*rspMetadataVal)
if !ok || val == nil || val.m == nil {
return
}
for key, values := range md {
val.m.Append(key, values...)
}
}
// getResponseMetadata retrieves the metadata.Metadata stored in the context.
//
// Note: this function is not thread-safe. Synchronization is required if used from multiple goroutines.
// If you plan to modify the returned metadata, make a full copy to avoid affecting shared state.
func getResponseMetadata(ctx context.Context) metadata.Metadata {
val, ok := ctx.Value(rspMetadataKey{}).(*rspMetadataVal)
if !ok || val == nil || val.m == nil {
return nil
}
return val.m
}

136
metadata_test.go Normal file
View File

@@ -0,0 +1,136 @@
package grpc
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"go.unistack.org/micro/v4/metadata"
)
func TestAppendResponseMetadata(t *testing.T) {
tests := []struct {
name string
ctx context.Context
md metadata.Metadata
expected context.Context
}{
{
name: "nil metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: nil,
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
},
{
name: "empty metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Metadata{},
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
},
{
name: "context without response metadata key",
ctx: context.Background(),
md: metadata.Pairs("key1", "val1"),
expected: context.Background(),
},
{
name: "context with nil response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, nil),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, nil),
},
{
name: "context with incorrect type in response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, struct{}{}),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, struct{}{}),
},
{
name: "context with response metadata value, but nil metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
},
{
name: "basic metadata append",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Pairs("key1", "val1"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Metadata{
"key1": []string{"val1"},
},
}),
},
{
name: "multiple values for same key",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Pairs("key1", "val1", "key1", "val2"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Metadata{
"key1": []string{"val1", "val2"},
},
}),
},
{
name: "multiple values for different keys",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: metadata.Metadata{}}),
md: metadata.Pairs("key1", "val1", "key1", "val2", "key2", "val3", "key2", "val4", "key3", "val5"),
expected: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Metadata{
"key1": []string{"val1", "val2"},
"key2": []string{"val3", "val4"},
"key3": []string{"val5"},
},
}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
AppendResponseMetadata(tt.ctx, tt.md)
require.Equal(t, tt.expected, tt.ctx)
})
}
}
func TestGetResponseMetadata(t *testing.T) {
tests := []struct {
name string
ctx context.Context
expected metadata.Metadata
}{
{
name: "context without response metadata key",
ctx: context.Background(),
expected: nil,
},
{
name: "context with nil response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, nil),
expected: nil,
},
{
name: "context with incorrect type in response metadata value",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &struct{}{}),
expected: nil,
},
{
name: "context with response metadata value, but nil metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{m: nil}),
expected: nil,
},
{
name: "valid metadata",
ctx: context.WithValue(context.Background(), rspMetadataKey{}, &rspMetadataVal{
m: metadata.Pairs("key1", "value1"),
}),
expected: metadata.Metadata{"key1": {"value1"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, getResponseMetadata(tt.ctx))
})
}
}

View File

@@ -3,7 +3,7 @@ package grpc
import ( import (
"context" "context"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )

View File

@@ -4,8 +4,7 @@ import (
"fmt" "fmt"
"testing" "testing"
_ "go.unistack.org/micro-server-grpc/v3/proto" "go.unistack.org/micro/v4/server"
"go.unistack.org/micro/v3/server"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry" "google.golang.org/protobuf/reflect/protoregistry"

View File

@@ -1,9 +1,9 @@
package grpc package grpc
import ( import (
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
var _ server.Request = &rpcRequest{} var _ server.Request = &rpcRequest{}

View File

@@ -1,9 +1,9 @@
package grpc package grpc
import ( import (
"go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
var _ server.Response = &rpcResponse{} var _ server.Response = &rpcResponse{}

View File

@@ -14,7 +14,7 @@ import (
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
) )
// Precompute the reflect type for error. Can't use error directly // Precompute the reflect type for error. Can't use error directly
@@ -47,8 +47,8 @@ type rServer struct {
// Is this an exported - upper case - name? // Is this an exported - upper case - name?
func isExported(name string) bool { func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name) r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune) return unicode.IsUpper(r)
} }
// Is this type exported or a builtin? // Is this type exported or a builtin?
@@ -123,43 +123,43 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
} }
func (server *rServer) register(rcvr interface{}) error { func (s *rServer) register(rcvr interface{}) error {
server.mu.Lock() s.mu.Lock()
defer server.mu.Unlock() defer s.mu.Unlock()
if server.serviceMap == nil { if s.serviceMap == nil {
server.serviceMap = make(map[string]*service) s.serviceMap = make(map[string]*service)
} }
s := &service{} srv := &service{}
s.typ = reflect.TypeOf(rcvr) srv.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr) srv.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name() sname := reflect.Indirect(srv.rcvr).Type().Name()
if sname == "" { if sname == "" {
return fmt.Errorf("rpc: no service name for type %v", s.typ.String()) return fmt.Errorf("rpc: no service name for type %v", srv.typ.String())
} }
if !isExported(sname) { if !isExported(sname) {
return fmt.Errorf("rpc Register: type %s is not exported", sname) return fmt.Errorf("rpc Register: type %s is not exported", sname)
} }
if _, present := server.serviceMap[sname]; present { if _, present := s.serviceMap[sname]; present {
return fmt.Errorf("rpc: service already defined: %s", sname) return fmt.Errorf("rpc: service already defined: %s", sname)
} }
s.name = sname srv.name = sname
s.method = make(map[string]*methodType) srv.method = make(map[string]*methodType)
// Install the methods // Install the methods
for m := 0; m < s.typ.NumMethod(); m++ { for m := 0; m < srv.typ.NumMethod(); m++ {
method := s.typ.Method(m) method := srv.typ.Method(m)
mt, err := prepareEndpoint(method) mt, err := prepareEndpoint(method)
if mt != nil && err == nil { if mt != nil && err == nil {
s.method[method.Name] = mt srv.method[method.Name] = mt
} else if err != nil { } else if err != nil {
return err return err
} }
} }
if len(s.method) == 0 { if len(srv.method) == 0 {
return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname) return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname)
} }
server.serviceMap[s.name] = s s.serviceMap[srv.name] = srv
return nil return nil
} }

View File

@@ -3,7 +3,7 @@ package grpc
import ( import (
"context" "context"
"go.unistack.org/micro/v3/server" "go.unistack.org/micro/v4/server"
"google.golang.org/grpc" "google.golang.org/grpc"
) )

View File

@@ -1,258 +0,0 @@
package grpc
import (
"context"
"fmt"
"reflect"
"strings"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/server"
)
var _ server.Message = &rpcMessage{}
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
opts server.SubscriberOptions
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.NewSubscriberOptions(opts...)
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
} else {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
opts: options,
}
}
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) {
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = make(map[string]string)
}
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType
}
cf, err := g.newCodec(ct)
if err != nil {
return err
}
hdr := make(map[string]string, len(msg.Header))
for k, v := range msg.Header {
hdr[k] = v
}
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
opts.Hooks.EachPrev(func(hook options.Hook) {
if h, ok := hook.(server.HookSubHandler); ok {
fn = h(fn)
}
})
if g.wg != nil {
g.wg.Add(1)
}
go func() {
if g.wg != nil {
defer g.wg.Done()
}
cerr := fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
})
results <- cerr
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Options() server.SubscriberOptions {
return s.opts
}
func (g *Server) subscribe() error {
config := g.opts
subCtx := config.Context
for sb := range g.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts := []broker.SubscribeOption{
broker.SubscribeContext(subCtx),
broker.SubscribeAutoAck(sb.Options().AutoAck),
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(config.Context, "subscribing to topic: "+sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), g.createSubHandler(sb, config), opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}