Compare commits

..

23 Commits

Author SHA1 Message Date
be904bc534 send metadata as lowercase
Some checks failed
coverage / build (push) Successful in 1m58s
test / test (push) Successful in 4m45s
sync / sync (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-05-02 17:48:26 +03:00
b23469853f update ci (#189)
All checks were successful
sync / sync (push) Successful in 34s
2025-05-01 19:15:15 +03:00
afebe71814 [v4] fix ci pipeline (#188)
All checks were successful
coverage / build (push) Successful in 1m23s
test / test (push) Successful in 2m10s
sync / sync (push) Successful in 3m4s
* attempt to fix coverage/lint/test job

* add readme

* Apply Code Coverage Badge

* add readme

---------

Co-authored-by: pugnack <pugnack@users.noreply.github.com>
2025-04-28 09:27:45 +03:00
0e122ca5e3 improve sync
All checks were successful
sync / sync (push) Successful in 40s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 21:03:16 +03:00
be7e042282 update deps
Some checks failed
coverage / build (push) Failing after 1m52s
test / test (push) Successful in 4m57s
sync / sync (push) Successful in 10s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 16:28:15 +03:00
ab7da93001 update deps
Some checks failed
coverage / build (push) Failing after 1m31s
test / test (push) Successful in 3m22s
sync / sync (push) Successful in 12s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2025-04-27 16:16:55 +03:00
75437a46c8 prepare v4 (#139)
All checks were successful
test / test (push) Successful in 1m55s
move to v4 micro

Co-authored-by: Василий Толстов <v.tolstov@unistack.org>
Co-authored-by: Александр Толстихин <tolstihin1996@mail.ru>
Reviewed-on: #139
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
2025-03-03 09:53:41 +03:00
cee07d11d4 Merge pull request 'move set content-type in client publish' (#138) from devstigneev/micro-client-grpc:v3 into v3
All checks were successful
test / test (push) Successful in 3m5s
Reviewed-on: #138
2025-01-18 15:37:52 +03:00
d5360e2804 move set content-type in client publish
All checks were successful
lint / lint (pull_request) Successful in 1m37s
test / test (pull_request) Successful in 3m5s
2025-01-17 17:52:22 +03:00
2d23f347eb Merge pull request 'update hooks calling and fix errors create' (#137) from devstigneev/micro-client-grpc:v3 into v3
All checks were successful
test / test (push) Successful in 3m23s
Reviewed-on: #137
2024-12-19 23:49:49 +03:00
78664a34ed add test
All checks were successful
lint / lint (pull_request) Successful in 54s
test / test (pull_request) Successful in 2m17s
2024-12-19 23:36:02 +03:00
bec0e310e9 update hooks calling and fix errors create
Some checks failed
lint / lint (pull_request) Failing after 1m15s
test / test (pull_request) Failing after 12m32s
2024-12-19 15:20:07 +03:00
92436e9016 Update workflows (#136)
All checks were successful
test / test (push) Successful in 2m48s
- Rename workflows
- pr -> job_lint
- build -> job_test
- update golangci

Co-authored-by: Aleksandr Tolstikhin <atolstikhin@mtsbank.ru>
Reviewed-on: #136
Co-authored-by: Александр Толстихин <tolstihin1996@mail.ru>
Co-committed-by: Александр Толстихин <tolstihin1996@mail.ru>
2024-12-11 00:47:18 +03:00
25618a3859 update deps
Some checks failed
build / lint (push) Failing after 8s
build / test (push) Failing after 8s
codeql / analyze (go) (push) Failing after 10s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-20 18:03:39 +03:00
b22f150601 update to latest micro
Some checks failed
build / test (push) Failing after 7s
build / lint (push) Failing after 8s
codeql / analyze (go) (push) Failing after 12s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-09-17 13:11:39 +03:00
71bcb63b60 fixup trace stream
Some checks failed
codeql / analyze (go) (push) Failing after 1m44s
build / test (push) Failing after 1m49s
build / lint (push) Successful in 9m17s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 00:24:46 +03:00
698bfbc6f1 meter and tracing
Some checks failed
build / test (push) Failing after 1m45s
codeql / analyze (go) (push) Failing after 1m44s
build / lint (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-04-23 00:20:29 +03:00
209042de3a Merge pull request 'issue_132' (#134) from devstigneev/micro-client-grpc:issue_132 into v3
Some checks failed
build / test (push) Failing after 1m40s
codeql / analyze (go) (push) Failing after 2m4s
build / lint (push) Successful in 9m23s
Reviewed-on: #134
2024-02-29 23:07:33 +03:00
990685628d Merge remote-tracking branch 'origin/v3' into issue_132
Some checks failed
automerge / automerge (pull_request) Has been skipped
dependabot-automerge / automerge (pull_request) Has been skipped
autoapprove / autoapprove (pull_request) Successful in 10s
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
# Conflicts:
#	grpc.go
2024-02-29 17:29:50 +03:00
0f8ead6acc add defaultConfigService
Some checks failed
automerge / automerge (pull_request) Has been skipped
dependabot-automerge / automerge (pull_request) Has been skipped
autoapprove / autoapprove (pull_request) Successful in 10s
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
2024-02-29 16:58:44 +03:00
2fbcee7b59 export grpc pool conn
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 19:23:01 +03:00
19a469c4e2 export grpc pool conn
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 19:14:56 +03:00
2a6a93a792 export grpc conn pool
Some checks are pending
build / test (push) Waiting to run
build / lint (push) Waiting to run
codeql / analyze (go) (push) Waiting to run
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-14 18:09:34 +03:00
27 changed files with 566 additions and 628 deletions

View File

@@ -1,19 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

View File

@@ -1,20 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -1,21 +0,0 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@@ -1,47 +0,0 @@
name: build
on:
push:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

View File

@@ -1,78 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@@ -1,27 +0,0 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

53
.github/workflows/job_coverage.yml vendored Normal file
View File

@@ -0,0 +1,53 @@
name: coverage
on:
push:
branches: [ main, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
pull_request:
branches: [ main, v3, v4 ]
jobs:
build:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: test coverage
run: |
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./...
go tool cover -func coverage.out -o coverage.out
- name: coverage badge
uses: tj-actions/coverage-badge-go@v2
with:
green: 80
filename: coverage.out
- uses: stefanzweifel/git-auto-commit-action@v4
name: autocommit
with:
commit_message: Apply Code Coverage Badge
skip_fetch: false
skip_checkout: false
file_pattern: ./README.md
- name: push
if: steps.auto-commit-action.outputs.changes_detected == 'true'
uses: ad-m/github-push-action@master
with:
github_token: ${{ github.token }}
branch: ${{ github.ref }}

29
.github/workflows/job_lint.yml vendored Normal file
View File

@@ -0,0 +1,29 @@
name: lint
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run lint
uses: golangci/golangci-lint-action@v6
with:
version: 'latest'

52
.github/workflows/job_sync.yml vendored Normal file
View File

@@ -0,0 +1,52 @@
name: sync
on:
schedule:
- cron: '*/5 * * * *'
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
sync:
if: github.server_url != 'https://github.com'
runs-on: ubuntu-latest
steps:
- name: init
run: |
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
git config --global user.name "github-actions[bot]"
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
- name: sync master
run: |
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
git merge upstream/master
git push upstream master --progress
git push origin master --progress
cd ../
rm -rf repo
- name: sync v3
run: |
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
git merge upstream/v3
git push upstream v3 --progress
git push origin v3 --progress
cd ../
rm -rf repo
- name: sync v4
run: |
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
cd repo
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
git merge upstream/v4
git push upstream v4 --progress
git push origin v4 --progress
cd ../
rm -rf repo

31
.github/workflows/job_test.yml vendored Normal file
View File

@@ -0,0 +1,31 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup deps
run: go get -v ./...
- name: run test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...

50
.github/workflows/job_tests.yml vendored Normal file
View File

@@ -0,0 +1,50 @@
name: test
on:
pull_request:
types: [opened, reopened, synchronize]
branches: [ master, v3, v4 ]
push:
branches: [ master, v3, v4 ]
paths-ignore:
- '.github/**'
- '.gitea/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v4
with:
filter: 'blob:none'
- name: checkout tests
uses: actions/checkout@v4
with:
ref: master
filter: 'blob:none'
repository: unistack-org/micro-tests
path: micro-tests
- name: setup go
uses: actions/setup-go@v5
with:
cache-dependency-path: "**/*.sum"
go-version: 'stable'
- name: setup go work
env:
GOWORK: ${{ github.workspace }}/go.work
run: |
go work init
go work use .
go work use micro-tests
- name: setup deps
env:
GOWORK: ${{ github.workspace }}/go.work
run: go get -v ./...
- name: run tests
env:
INTEGRATION_TESTS: yes
GOWORK: ${{ github.workspace }}/go.work
run: |
cd micro-tests
go test -mod readonly -v ./... || true

View File

@@ -1,47 +0,0 @@
name: prbuild
on:
pull_request:
branches:
- master
- v3
jobs:
test:
name: test
runs-on: ubuntu-latest
steps:
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
- name: checkout
uses: actions/checkout@v3
- name: cache
uses: actions/cache@v3
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: deps
run: go get -v -t -d ./...
- name: test
env:
INTEGRATION_TESTS: yes
run: go test -mod readonly -v ./...
lint:
name: lint
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: lint
uses: golangci/golangci-lint-action@v3.4.0
continue-on-error: true
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.30
# Optional: working directory, useful for monorepos
# working-directory: somedir
# Optional: golangci-lint command line arguments.
# args: --issues-exit-code=0
# Optional: show only new issues if it's a pull request. The default value is `false`.
# only-new-issues: true

24
.gitignore vendored Normal file
View File

@@ -0,0 +1,24 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

View File

@@ -1,44 +1,5 @@
run:
concurrency: 4
deadline: 5m
concurrency: 8
timeout: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

4
README.md Normal file
View File

@@ -0,0 +1,4 @@
# GRPC Client
![Coverage](https://img.shields.io/badge/Coverage-2.3%25-red)
This plugin is a grpc client for micro.

View File

@@ -1,9 +1,7 @@
package grpc
import (
"io"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v4/codec"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
)
@@ -65,63 +63,3 @@ func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}, opts ...codec.Option)
}
return w.Codec.Unmarshal(d, v)
}
/*
type grpcCodec struct {
grpc.ServerStream
// headers
id string
target string
method string
endpoint string
c encoding.Codec
}
*/
func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
/*
if m == nil {
m = codec.NewMessage(codec.Request)
}
if md, ok := metadata.FromIncomingContext(g.ServerStream.Context()); ok {
if m.Header == nil {
m.Header = meta.New(len(md))
}
for k, v := range md {
m.Header[k] = strings.Join(v, ",")
}
}
m.Id = g.id
m.Target = g.target
m.Method = g.method
m.Endpoint = g.endpoint
*/
return nil
}
func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
// caller has requested a frame
if m, ok := v.(*codec.Frame); ok {
_, err := conn.Read(m.Data)
return err
}
return codec.ErrInvalidMessage
}
func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
// if we don't have a body
if v != nil {
b, err := w.Marshal(v)
if err != nil {
return err
}
m.Body = b
}
// write the body using the framing codec
_, err := conn.Write(m.Body)
return err
}

67
codec_test.go Normal file
View File

@@ -0,0 +1,67 @@
package grpc
import (
"context"
"testing"
"go.unistack.org/micro/v4/codec"
gmetadata "google.golang.org/grpc/metadata"
)
type mockStream struct {
msg any
}
func (m mockStream) Header() (gmetadata.MD, error) {
return nil, nil
}
func (m mockStream) Trailer() gmetadata.MD {
return nil
}
func (m mockStream) CloseSend() error {
return nil
}
func (m mockStream) Context() context.Context {
return nil
}
func (m *mockStream) SendMsg(msg any) error {
m.msg = msg
return nil
}
func (m *mockStream) RecvMsg(msg any) error {
c := msg.(*codec.Frame)
c.Data = m.msg.(*codec.Frame).Data
return nil
}
func Test_ReadWrap(t *testing.T) {
wp := wrapStream{
&mockStream{},
}
write, err := wp.Write([]byte("test_data"))
if err != nil {
t.Fatal(err)
}
if write != 9 {
t.Error("uncorrected number wrote bytes")
}
b := make([]byte, write)
read, err := wp.Read(b)
if err != nil {
t.Fatal(err)
}
if read != 9 || string(b) != "test_data" {
t.Error("uncorrected number wrote bytes or data")
}
}

View File

@@ -1,7 +1,7 @@
package grpc
import (
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v4/errors"
"google.golang.org/grpc/status"
)

27
go.mod
View File

@@ -1,17 +1,24 @@
module go.unistack.org/micro-client-grpc/v3
module go.unistack.org/micro-client-grpc/v4
go 1.20
go 1.23.0
toolchain go1.23.3
require (
go.unistack.org/micro/v3 v3.10.28
google.golang.org/grpc v1.59.0
go.unistack.org/micro/v4 v4.1.7
google.golang.org/grpc v1.72.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/protobuf v1.31.0 // indirect
github.com/ash3in/uuidv8 v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/matoous/go-nanoid v1.5.1 // indirect
github.com/spf13/cast v1.7.1 // indirect
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

81
go.sum
View File

@@ -1,22 +1,59 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
go.unistack.org/micro/v3 v3.10.28 h1:/87lGekrmi0/66pioy+Nh8lVUBBYnVqKoHiNYX5OmMI=
go.unistack.org/micro/v3 v3.10.28/go.mod h1:eUgtvbtiiz6te93m0ZdmoecbitWwjdBmmr84srmEIKA=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
go.unistack.org/micro/v4 v4.1.7 h1:4/dSEMTqxYoHimn/8wKDohfTXi2zDy6eWXYBCnBaZdc=
go.unistack.org/micro/v4 v4.1.7/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 h1:29cjnHVylHwTzH66WfFZqgSQgnxzvWE+jvBwpZCLRxY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

284
grpc.go
View File

@@ -1,28 +1,32 @@
// Package grpc provides a gRPC client
package grpc // import "go.unistack.org/micro-client-grpc/v3"
// Package grpc provides a gRPC client for micro framework
package grpc
import (
"context"
"crypto/tls"
"fmt"
"net"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/selector"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/selector"
"go.unistack.org/micro/v4/semconv"
"go.unistack.org/micro/v4/tracer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const (
@@ -30,8 +34,10 @@ const (
)
type grpcClient struct {
pool *ConnPool
opts client.Options
funcCall client.FuncCall
funcStream client.FuncStream
pool *ConnPool
opts client.Options
sync.RWMutex
init bool
}
@@ -68,36 +74,33 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
}
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string
var header map[string][]string
if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = make(map[string]string, len(md))
for k, v := range md {
header[strings.ToLower(k)] = v
}
header = metadata.Copy(md).AsHTTP2()
} else {
header = make(map[string]string, 2)
header = make(map[string][]string, 2)
}
if opts.RequestMetadata != nil {
for k, v := range opts.RequestMetadata {
header[k] = v
header[strings.ToLower(k)] = v
}
}
// set timeout in nanoseconds
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header["content-type"] = req.ContentType()
header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
header["content-type"] = append(header["content-type"], req.ContentType())
md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
ctx = gmetadata.NewOutgoingContext(ctx, header)
cf, err := g.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "%+v", err)
}
maxRecvMsgSize := g.maxRecvMsgSizeValue()
maxSendMsgSize := g.maxSendMsgSizeValue()
cfgService := g.serviceConfig()
var grr error
@@ -116,6 +119,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
grpc.WithDefaultServiceConfig(cfgService),
}
if opts := g.getGrpcDialOptions(g.opts.Context); opts != nil {
@@ -135,7 +139,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
return errors.InternalServerError("go.micro.client", "Error sending request: %v", err)
}
defer func() {
// defer execution of release
@@ -173,7 +177,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
if opts.ResponseMetadata != nil {
*opts.ResponseMetadata = metadata.New(gmd.Len())
for k, v := range gmd {
opts.ResponseMetadata.Set(k, strings.Join(v, ","))
opts.ResponseMetadata.Append(k, v...)
}
}
@@ -181,31 +185,27 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
}
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
var header map[string]string
var header map[string][]string
if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = make(map[string]string, len(md))
for k, v := range md {
header[k] = v
}
header = metadata.Copy(md).AsHTTP2()
} else {
header = make(map[string]string)
header = make(map[string][]string)
}
// set timeout in nanoseconds
if opts.StreamTimeout > time.Duration(0) {
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
}
// set the content type for the request
header["content-type"] = req.ContentType()
header["content-type"] = append(header["content-type"], req.ContentType())
md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
ctx = gmetadata.NewOutgoingContext(ctx, header)
cf, err := g.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "%+v", err)
}
var dialCtx context.Context
@@ -221,6 +221,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
maxRecvMsgSize := g.maxRecvMsgSizeValue()
maxSendMsgSize := g.maxSendMsgSizeValue()
cfgService := g.serviceConfig()
grpcDialOptions := []grpc.DialOption{
g.secure(addr),
@@ -228,6 +229,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
grpc.WithDefaultServiceConfig(cfgService),
}
if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
@@ -244,7 +246,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
cc, err := g.pool.Get(dialCtx, addr, grpcDialOptions...)
if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
return errors.InternalServerError("go.micro.client", "Error sending request: %v", err)
}
desc := &grpc.StreamDesc{
@@ -277,7 +279,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// release the connection
g.pool.Put(cc, err)
// now return the error
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
return errors.InternalServerError("go.micro.client", "Error creating stream: %v", err)
}
// set request codec
@@ -372,6 +374,17 @@ func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType
}
func (g *grpcClient) serviceConfig() string {
if g.opts.Context == nil {
return DefaultServiceConfig
}
v := g.opts.Context.Value(serviceConfigKey{})
if v == nil {
return DefaultServiceConfig
}
return v.(string)
}
func (g *grpcClient) Init(opts ...client.Option) error {
if len(opts) == 0 && g.init {
return nil
@@ -391,25 +404,17 @@ func (g *grpcClient) Init(opts ...client.Option) error {
g.pool.Unlock()
}
if err := g.opts.Broker.Init(); err != nil {
return err
}
if err := g.opts.Tracer.Init(); err != nil {
return err
}
if err := g.opts.Router.Init(); err != nil {
return err
}
if err := g.opts.Logger.Init(); err != nil {
return err
}
if err := g.opts.Meter.Init(); err != nil {
return err
}
if err := g.opts.Transport.Init(); err != nil {
return err
}
g.funcCall = g.fnCall
g.funcStream = g.fnStream
g.opts.Hooks.EachPrev(func(hook options.Hook) {
switch h := hook.(type) {
case client.HookCall:
g.funcCall = h(g.funcCall)
case client.HookStream:
g.funcStream = h(g.funcStream)
}
})
g.init = true
return nil
@@ -419,10 +424,6 @@ func (g *grpcClient) Options() client.Options {
return g.opts
}
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
}
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
}
@@ -433,6 +434,32 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
} else if rsp == nil {
return errors.InternalServerError("go.micro.client", "rsp is nil")
}
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
err := g.funcCall(ctx, req, rsp, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := errors.FromError(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
}
return err
}
func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts
callOpts := g.opts.CallOptions
@@ -464,11 +491,6 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
// make copy of call method
gcall := g.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gcall = callOpts.CallWrappers[i-1](gcall)
}
// use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil {
callOpts.Router = g.opts.Router
@@ -490,7 +512,7 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "%+v", err)
}
// only sleep if greater than 0
@@ -505,7 +527,7 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
// TODO apply any filtering here
routes, err = g.opts.Lookup(ctx, req, callOpts)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "%+v", err)
}
// balance the list of nodes
@@ -568,6 +590,31 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
}
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
stream, err := g.funcStream(ctx, req, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := status.Convert(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
}
return stream, err
}
func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
// make a copy of call opts
callOpts := g.opts.CallOptions
for _, opt := range opts {
@@ -586,11 +633,6 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
// make a copy of stream
gstream := g.stream
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
gstream = callOpts.CallWrappers[i-1](gstream)
}
// use the router passed as a call option, or fallback to the rpc clients router
if callOpts.Router == nil {
callOpts.Router = g.opts.Router
@@ -612,7 +654,7 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "%+v", err)
}
// only sleep if greater than 0
@@ -627,7 +669,7 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
// TODO apply any filtering here
routes, err = g.opts.Lookup(ctx, req, callOpts)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "%+v", err)
}
// balance the list of nodes
@@ -700,76 +742,6 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
return nil, grr
}
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, ps, opts...)
}
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return g.publish(ctx, []client.Message{p}, opts...)
}
func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
var body []byte
options := client.NewPublishOptions(opts...)
// get proxy
exchange := ""
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
exchange = v
}
// get the exchange
if len(options.Exchange) > 0 {
exchange = options.Exchange
}
msgs := make([]*broker.Message, 0, len(ps))
omd, ok := metadata.FromOutgoingContext(ctx)
if !ok {
omd = metadata.New(2)
}
for _, p := range ps {
md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
}
md.Set(metadata.HeaderTopic, topic)
iter := p.Metadata().Iterator()
var k, v string
for iter.Next(&k, &v) {
md.Set(k, v)
}
// passed in raw data
if d, ok := p.Payload().(*codec.Frame); ok {
body = d.Data
} else {
// use codec for payload
cf, err := g.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}
msgs = append(msgs, &broker.Message{Header: md, Body: body})
}
return g.opts.Broker.BatchPublish(ctx, msgs,
broker.PublishContext(options.Context),
broker.PublishBodyOnly(options.BodyOnly),
)
}
func (g *grpcClient) String() string {
return "grpc"
}
@@ -829,23 +801,16 @@ func NewClient(opts ...client.Option) client.Client {
options.ContentType = DefaultContentType
}
rc := &grpcClient{
c := &grpcClient{
opts: options,
}
rc.pool = NewConnPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
c.pool = NewConnPool(options.PoolSize, options.PoolTTL, c.poolMaxIdle(), c.poolMaxStreams())
c := client.Client(rc)
// wrap in reverse
for i := len(options.Wrappers); i > 0; i-- {
c = options.Wrappers[i-1](c)
}
if rc.opts.Context != nil {
if codecs, ok := rc.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
if c.opts.Context != nil {
if codecs, ok := c.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs {
rc.opts.Codecs[k] = &wrapGrpcCodec{v}
c.opts.Codecs[k] = &wrapGrpcCodec{v}
}
}
}
@@ -854,5 +819,8 @@ func NewClient(opts ...client.Option) client.Client {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
c.funcCall = c.fnCall
c.funcStream = c.fnStream
return c
}

View File

@@ -21,22 +21,22 @@ type ConnPool struct {
type streamsPool struct {
// head of list
head *poolConn
head *PoolConn
// busy conns list
busy *poolConn
busy *PoolConn
// the siza of list
count int
// idle conn
idle int
}
type poolConn struct {
type PoolConn struct {
err error
*grpc.ClientConn
next *poolConn
next *PoolConn
pool *ConnPool
sp *streamsPool
pre *poolConn
pre *PoolConn
addr string
streams int
created int64
@@ -59,7 +59,7 @@ func NewConnPool(size int, ttl time.Duration, idle int, ms int) *ConnPool {
}
}
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) {
func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption) (*PoolConn, error) {
if strings.HasPrefix(addr, "http") {
addr = addr[strings.Index(addr, ":")+3:]
}
@@ -67,7 +67,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
p.Lock()
sp, ok := p.conns[addr]
if !ok {
sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0}
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
p.conns[addr] = sp
}
// while we have conns check streams and then return one
@@ -130,12 +130,12 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
}
p.Unlock()
// create new conn)
// nolint (TODO need fix) create new conn)
cc, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, err
}
conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
// add conn to streams pool
p.Lock()
@@ -147,7 +147,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
return conn, nil
}
func (p *ConnPool) Put(conn *poolConn, err error) {
func (p *ConnPool) Put(conn *PoolConn, err error) {
p.Lock()
p, sp, created := conn.pool, conn.sp, conn.created
// try to add conn
@@ -182,11 +182,11 @@ func (p *ConnPool) Put(conn *poolConn, err error) {
p.Unlock()
}
func (conn *poolConn) Close() {
func (conn *PoolConn) Close() {
conn.pool.Put(conn, conn.err)
}
func removeConn(conn *poolConn) {
func removeConn(conn *PoolConn) {
if conn.pre != nil {
conn.pre.next = conn.next
}
@@ -199,7 +199,7 @@ func removeConn(conn *poolConn) {
conn.sp.count--
}
func addConnAfter(conn *poolConn, after *poolConn) {
func addConnAfter(conn *PoolConn, after *PoolConn) {
conn.next = after.next
conn.pre = after
if after.next != nil {

View File

@@ -1,44 +0,0 @@
package grpc
import (
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
)
type grpcEvent struct {
payload interface{}
topic string
contentType string
opts client.MessageOptions
}
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
options := client.NewMessageOptions(opts...)
if len(options.ContentType) > 0 {
contentType = options.ContentType
}
return &grpcEvent{
payload: payload,
topic: topic,
contentType: contentType,
opts: options,
}
}
func (g *grpcEvent) ContentType() string {
return g.contentType
}
func (g *grpcEvent) Topic() string {
return g.topic
}
func (g *grpcEvent) Payload() interface{} {
return g.payload
}
func (g *grpcEvent) Metadata() metadata.Metadata {
return g.opts.Metadata
}

View File

@@ -4,13 +4,13 @@ package grpc
import (
"context"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v4/client"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
)
var (
// DefaultPoolMaxStreams maximum streams on a connectioin
// DefaultPoolMaxStreams maximum streams on a connection
// (20)
DefaultPoolMaxStreams = 20
@@ -25,6 +25,9 @@ var (
// DefaultMaxSendMsgSize maximum message that client can send
// (4 MB).
DefaultMaxSendMsgSize = 1024 * 1024 * 4
// DefaultServiceConfig enable load balancing
DefaultServiceConfig = `{"loadBalancingPolicy":"round_robin"}`
)
type poolMaxStreams struct{}
@@ -115,3 +118,14 @@ func CallOptions(opts ...grpc.CallOption) client.CallOption {
o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts)
}
}
type serviceConfigKey struct{}
func ServiceConfig(str string) client.CallOption {
return func(options *client.CallOptions) {
if options.Context == nil {
options.Context = context.Background()
}
options.Context = context.WithValue(options.Context, serviceConfigKey{}, str)
}
}

View File

@@ -4,8 +4,8 @@ import (
"fmt"
"strings"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
)
type grpcRequest struct {

View File

@@ -1,15 +1,13 @@
package grpc
import (
"strings"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/metadata"
"google.golang.org/grpc"
)
type response struct {
conn *poolConn
conn *PoolConn
stream grpc.ClientStream
codec codec.Codec
}
@@ -25,18 +23,19 @@ func (r *response) Header() metadata.Metadata {
if err != nil {
return nil
}
md := metadata.New(len(meta))
for k, v := range meta {
md.Set(k, strings.Join(v, ","))
}
return md
return metadata.Metadata(meta.Copy())
}
// Read the undecoded response
func (r *response) Read() ([]byte, error) {
f := &codec.Frame{}
if err := r.codec.ReadBody(&wrapStream{r.stream}, f); err != nil {
wrap := &wrapStream{r.stream}
_, err := wrap.Read(f.Data)
if err != nil {
return nil, err
}
return f.Data, nil
}

View File

@@ -5,7 +5,8 @@ import (
"io"
"sync"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/tracer"
"google.golang.org/grpc"
)
@@ -17,7 +18,7 @@ type grpcStream struct {
request client.Request
response client.Response
close func(err error)
conn *poolConn
conn *PoolConn
sync.RWMutex
closed bool
}
@@ -111,6 +112,12 @@ func (g *grpcStream) Close() error {
return nil
}
if sp, ok := tracer.SpanFromContext(g.context); ok && sp != nil {
if g.err != nil {
sp.SetStatus(tracer.SpanStatusError, g.err.Error())
}
sp.Finish()
}
// close the connection
g.closed = true
g.close(g.err)