Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
dae26293b9 | |||
bcc1e695b3 |
8
.github/ISSUE_TEMPLATE/bug_report.md
vendored
8
.github/ISSUE_TEMPLATE/bug_report.md
vendored
@@ -1,6 +1,6 @@
|
|||||||
---
|
---
|
||||||
name: Bug report
|
name: Bug report
|
||||||
about: For reporting bugs in micro
|
about: For reporting bugs in go-micro
|
||||||
title: "[BUG]"
|
title: "[BUG]"
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
||||||
@@ -16,3 +16,9 @@ assignees: ''
|
|||||||
**How to reproduce the bug:**
|
**How to reproduce the bug:**
|
||||||
|
|
||||||
If possible, please include a minimal code snippet here.
|
If possible, please include a minimal code snippet here.
|
||||||
|
|
||||||
|
**Environment:**
|
||||||
|
Go Version: please paste `go version` output here
|
||||||
|
```
|
||||||
|
please paste `go env` output here
|
||||||
|
```
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
---
|
---
|
||||||
name: Feature request / Enhancement
|
name: Feature request / Enhancement
|
||||||
about: If you have a need not served by micro
|
about: If you have a need not served by go-micro
|
||||||
title: "[FEATURE]"
|
title: "[FEATURE]"
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
||||||
@@ -14,4 +14,4 @@ A clear and concise description of what the problem is. Ex. I'm always frustrate
|
|||||||
A clear and concise description of what you want to happen.
|
A clear and concise description of what you want to happen.
|
||||||
|
|
||||||
**Additional context**
|
**Additional context**
|
||||||
Add any other context or screenshots about the feature request here.
|
Add any other context or screenshots about the feature request here.
|
||||||
|
10
.github/ISSUE_TEMPLATE/question.md
vendored
10
.github/ISSUE_TEMPLATE/question.md
vendored
@@ -1,8 +1,14 @@
|
|||||||
---
|
---
|
||||||
name: Question
|
name: Question
|
||||||
about: Ask a question about micro
|
about: Ask a question about go-micro
|
||||||
title: ''
|
title: ''
|
||||||
labels: ''
|
labels: ''
|
||||||
assignees: ''
|
assignees: ''
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
Before asking, please check if your question has already been answered:
|
||||||
|
|
||||||
|
1. Check the documentation - https://micro.mu/docs/
|
||||||
|
2. Check the examples and plugins - https://github.com/micro/examples & https://github.com/micro/go-plugins
|
||||||
|
3. Search existing issues
|
||||||
|
28
.github/autoapprove.yml
vendored
28
.github/autoapprove.yml
vendored
@@ -1,28 +0,0 @@
|
|||||||
name: "autoapprove"
|
|
||||||
|
|
||||||
on:
|
|
||||||
pull_request_target:
|
|
||||||
types: [assigned, opened, synchronize, reopened]
|
|
||||||
workflow_run:
|
|
||||||
workflows: ["prbuild"]
|
|
||||||
types:
|
|
||||||
- completed
|
|
||||||
|
|
||||||
permissions:
|
|
||||||
pull-requests: write
|
|
||||||
contents: write
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
autoapprove:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: approve
|
|
||||||
run: [ "curl -o tea https://dl.gitea.com/tea/main/tea-main-linux-amd64",
|
|
||||||
"chmod +x ./tea",
|
|
||||||
"./tea login add --name unistack --token ${{ secrets.GITHUB_TOKEN }} --url https://git.unistack.org",
|
|
||||||
"./tea pr --repo ${{ github.event.repository.name }}"
|
|
||||||
]
|
|
||||||
if: github.actor == 'vtolstov'
|
|
||||||
id: approve
|
|
||||||
with:
|
|
||||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
|
94
.github/workflows/job_sync.yml
vendored
94
.github/workflows/job_sync.yml
vendored
@@ -1,94 +0,0 @@
|
|||||||
name: sync
|
|
||||||
|
|
||||||
on:
|
|
||||||
schedule:
|
|
||||||
- cron: '*/5 * * * *'
|
|
||||||
# Allows you to run this workflow manually from the Actions tab
|
|
||||||
workflow_dispatch:
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
sync:
|
|
||||||
if: github.server_url != 'https://github.com'
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: init
|
|
||||||
run: |
|
|
||||||
git config --global user.email "vtolstov <vtolstov@users.noreply.github.com>"
|
|
||||||
git config --global user.name "github-actions[bot]"
|
|
||||||
echo "machine git.unistack.org login vtolstov password ${{ secrets.TOKEN_GITEA }}" >> /root/.netrc
|
|
||||||
echo "machine github.com login vtolstov password ${{ secrets.TOKEN_GITHUB }}" >> /root/.netrc
|
|
||||||
|
|
||||||
- name: check master
|
|
||||||
id: check_master
|
|
||||||
run: |
|
|
||||||
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
|
||||||
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/master | cut -f1)
|
|
||||||
echo "src_hash=$src_hash"
|
|
||||||
echo "dst_hash=$dst_hash"
|
|
||||||
if [ "$src_hash" != "$dst_hash" ]; then
|
|
||||||
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
|
||||||
else
|
|
||||||
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
|
||||||
fi
|
|
||||||
|
|
||||||
- name: sync master
|
|
||||||
if: steps.check_master.outputs.sync_needed == 'true'
|
|
||||||
run: |
|
|
||||||
git clone --filter=blob:none --filter=tree:0 --branch master --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
|
||||||
cd repo
|
|
||||||
git remote add --no-tags --fetch --track master upstream https://github.com/${GITHUB_REPOSITORY}
|
|
||||||
git pull --rebase upstream master
|
|
||||||
git push upstream master --progress
|
|
||||||
git push origin master --progress
|
|
||||||
cd ../
|
|
||||||
rm -rf repo
|
|
||||||
|
|
||||||
- name: check v3
|
|
||||||
id: check_v3
|
|
||||||
run: |
|
|
||||||
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
|
||||||
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v3 | cut -f1)
|
|
||||||
echo "src_hash=$src_hash"
|
|
||||||
echo "dst_hash=$dst_hash"
|
|
||||||
if [ "$src_hash" != "$dst_hash" ]; then
|
|
||||||
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
|
||||||
else
|
|
||||||
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
|
||||||
fi
|
|
||||||
|
|
||||||
- name: sync v3
|
|
||||||
if: steps.check_v3.outputs.sync_needed == 'true'
|
|
||||||
run: |
|
|
||||||
git clone --filter=blob:none --filter=tree:0 --branch v3 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
|
||||||
cd repo
|
|
||||||
git remote add --no-tags --fetch --track v3 upstream https://github.com/${GITHUB_REPOSITORY}
|
|
||||||
git pull --rebase upstream v3
|
|
||||||
git push upstream v3 --progress
|
|
||||||
git push origin v3 --progress
|
|
||||||
cd ../
|
|
||||||
rm -rf repo
|
|
||||||
|
|
||||||
- name: check v4
|
|
||||||
id: check_v4
|
|
||||||
run: |
|
|
||||||
src_hash=$(git ls-remote https://github.com/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
|
||||||
dst_hash=$(git ls-remote ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} refs/heads/v4 | cut -f1)
|
|
||||||
echo "src_hash=$src_hash"
|
|
||||||
echo "dst_hash=$dst_hash"
|
|
||||||
if [ "$src_hash" != "$dst_hash" ]; then
|
|
||||||
echo "sync_needed=true" >> $GITHUB_OUTPUT
|
|
||||||
else
|
|
||||||
echo "sync_needed=false" >> $GITHUB_OUTPUT
|
|
||||||
fi
|
|
||||||
|
|
||||||
- name: sync v4
|
|
||||||
if: steps.check_v4.outputs.sync_needed == 'true'
|
|
||||||
run: |
|
|
||||||
git clone --filter=blob:none --filter=tree:0 --branch v4 --single-branch ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} repo
|
|
||||||
cd repo
|
|
||||||
git remote add --no-tags --fetch --track v4 upstream https://github.com/${GITHUB_REPOSITORY}
|
|
||||||
git pull --rebase upstream v4
|
|
||||||
git push upstream v4 --progress
|
|
||||||
git push origin v4 --progress
|
|
||||||
cd ../
|
|
||||||
rm -rf repo
|
|
@@ -1,4 +0,0 @@
|
|||||||
# GRPC Client
|
|
||||||

|
|
||||||
|
|
||||||
This plugin is a grpc client for micro.
|
|
2
codec.go
2
codec.go
@@ -1,7 +1,7 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v4/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
)
|
)
|
||||||
|
@@ -2,10 +2,9 @@ package grpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"go.unistack.org/micro/v3/codec"
|
||||||
|
|
||||||
"go.unistack.org/micro/v4/codec"
|
|
||||||
gmetadata "google.golang.org/grpc/metadata"
|
gmetadata "google.golang.org/grpc/metadata"
|
||||||
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockStream struct {
|
type mockStream struct {
|
||||||
|
2
error.go
2
error.go
@@ -1,7 +1,7 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v4/errors"
|
"go.unistack.org/micro/v3/errors"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
27
go.mod
27
go.mod
@@ -1,24 +1,19 @@
|
|||||||
module go.unistack.org/micro-client-grpc/v4
|
module go.unistack.org/micro-client-grpc/v3
|
||||||
|
|
||||||
go 1.23.0
|
go 1.21
|
||||||
|
|
||||||
toolchain go1.23.3
|
toolchain go1.23.1
|
||||||
|
|
||||||
require (
|
require (
|
||||||
go.unistack.org/micro/v4 v4.1.7
|
go.unistack.org/micro/v3 v3.10.91
|
||||||
google.golang.org/grpc v1.72.0
|
google.golang.org/grpc v1.67.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/ash3in/uuidv8 v1.2.0 // indirect
|
go.unistack.org/micro-proto/v3 v3.4.1 // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
golang.org/x/net v0.29.0 // indirect
|
||||||
github.com/matoous/go-nanoid v1.5.1 // indirect
|
golang.org/x/sys v0.25.0 // indirect
|
||||||
github.com/spf13/cast v1.7.1 // indirect
|
golang.org/x/text v0.18.0 // indirect
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||||
golang.org/x/net v0.39.0 // indirect
|
google.golang.org/protobuf v1.34.2 // indirect
|
||||||
golang.org/x/sys v0.32.0 // indirect
|
|
||||||
golang.org/x/text v0.24.0 // indirect
|
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect
|
|
||||||
google.golang.org/protobuf v1.36.6 // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
|
||||||
)
|
)
|
||||||
|
73
go.sum
73
go.sum
@@ -1,59 +1,18 @@
|
|||||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
|
||||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
|
||||||
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
|
|
||||||
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
|
|
||||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
|
||||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
|
||||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
|
||||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
|
||||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
|
||||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
|
||||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
|
||||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
|
||||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo=
|
||||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
go.unistack.org/micro/v3 v3.10.91 h1:vuJY4tXwpqimwIkEJ3TozMYNVQQs+C5QMlQWPgSY/YM=
|
||||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
go.unistack.org/micro/v3 v3.10.91/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
|
||||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
|
||||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
|
||||||
github.com/matoous/go-nanoid v1.5.1 h1:aCjdvTyO9LLnTIi0fgdXhOPPvOHjpXN6Ik9DaNjIct4=
|
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
|
||||||
github.com/matoous/go-nanoid v1.5.1/go.mod h1:zyD2a71IubI24efhpvkJz+ZwfwagzgSO6UNiFsZKN7U=
|
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
|
||||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||||
github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
|
||||||
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
|
||||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
|
||||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
|
||||||
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
|
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||||
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
|
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||||
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
|
|
||||||
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
|
|
||||||
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
|
|
||||||
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
|
|
||||||
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
|
|
||||||
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
|
|
||||||
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
|
|
||||||
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9lKUk=
|
|
||||||
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
|
|
||||||
go.unistack.org/micro/v4 v4.1.7 h1:4/dSEMTqxYoHimn/8wKDohfTXi2zDy6eWXYBCnBaZdc=
|
|
||||||
go.unistack.org/micro/v4 v4.1.7/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw=
|
|
||||||
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
|
||||||
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
|
||||||
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
|
||||||
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
|
||||||
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
|
||||||
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 h1:29cjnHVylHwTzH66WfFZqgSQgnxzvWE+jvBwpZCLRxY=
|
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
|
||||||
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
|
|
||||||
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
|
|
||||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
|
||||||
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
|
177
grpc.go
177
grpc.go
@@ -6,20 +6,22 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.unistack.org/micro/v4/client"
|
"go.unistack.org/micro/v3/broker"
|
||||||
"go.unistack.org/micro/v4/codec"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v4/errors"
|
"go.unistack.org/micro/v3/codec"
|
||||||
"go.unistack.org/micro/v4/metadata"
|
"go.unistack.org/micro/v3/errors"
|
||||||
"go.unistack.org/micro/v4/options"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"go.unistack.org/micro/v4/selector"
|
"go.unistack.org/micro/v3/options"
|
||||||
"go.unistack.org/micro/v4/semconv"
|
"go.unistack.org/micro/v3/selector"
|
||||||
"go.unistack.org/micro/v4/tracer"
|
"go.unistack.org/micro/v3/semconv"
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
@@ -34,12 +36,14 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type grpcClient struct {
|
type grpcClient struct {
|
||||||
funcCall client.FuncCall
|
funcPublish client.FuncPublish
|
||||||
funcStream client.FuncStream
|
funcBatchPublish client.FuncBatchPublish
|
||||||
pool *ConnPool
|
funcCall client.FuncCall
|
||||||
opts client.Options
|
funcStream client.FuncStream
|
||||||
mu sync.RWMutex
|
pool *ConnPool
|
||||||
init bool
|
opts client.Options
|
||||||
|
sync.RWMutex
|
||||||
|
init bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// secure returns the dial option for whether its a secure or insecure connection
|
// secure returns the dial option for whether its a secure or insecure connection
|
||||||
@@ -74,26 +78,28 @@ func (g *grpcClient) secure(addr string) grpc.DialOption {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
var header gmetadata.MD
|
var header map[string]string
|
||||||
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
header = metadata.Copy(md).AsHTTP2()
|
header = make(map[string]string, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
header[strings.ToLower(k)] = v
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
header = make(gmetadata.MD, 4)
|
header = make(map[string]string, 2)
|
||||||
}
|
}
|
||||||
if opts.RequestMetadata != nil {
|
if opts.RequestMetadata != nil {
|
||||||
for k, v := range opts.RequestMetadata {
|
for k, v := range opts.RequestMetadata {
|
||||||
header[strings.ToLower(k)] = v
|
header[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// set timeout in nanoseconds
|
// set timeout in nanoseconds
|
||||||
if opts.RequestTimeout > time.Duration(0) {
|
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
|
||||||
header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
|
||||||
header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
header["content-type"] = req.ContentType()
|
||||||
}
|
|
||||||
header.Set("content-type", req.ContentType())
|
|
||||||
|
|
||||||
ctx = gmetadata.NewOutgoingContext(ctx, header)
|
md := gmetadata.New(header)
|
||||||
|
ctx = gmetadata.NewOutgoingContext(ctx, md)
|
||||||
|
|
||||||
cf, err := g.newCodec(req.ContentType())
|
cf, err := g.newCodec(req.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -179,7 +185,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
if opts.ResponseMetadata != nil {
|
if opts.ResponseMetadata != nil {
|
||||||
*opts.ResponseMetadata = metadata.New(gmd.Len())
|
*opts.ResponseMetadata = metadata.New(gmd.Len())
|
||||||
for k, v := range gmd {
|
for k, v := range gmd {
|
||||||
opts.ResponseMetadata.Append(k, v...)
|
opts.ResponseMetadata.Set(k, strings.Join(v, ","))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -187,21 +193,27 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
||||||
var header gmetadata.MD
|
var header map[string]string
|
||||||
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
header = metadata.Copy(md).AsHTTP2()
|
header = make(map[string]string, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
header[k] = v
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
header = make(gmetadata.MD, 4)
|
header = make(map[string]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.RequestTimeout > time.Duration(0) {
|
// set timeout in nanoseconds
|
||||||
header.Set("grpc-timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
if opts.StreamTimeout > time.Duration(0) {
|
||||||
header.Set("timeout", fmt.Sprintf("%dn", opts.RequestTimeout))
|
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
|
||||||
|
header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
|
||||||
}
|
}
|
||||||
header.Set("content-type", req.ContentType())
|
// set the content type for the request
|
||||||
|
header["content-type"] = req.ContentType()
|
||||||
|
|
||||||
ctx = gmetadata.NewOutgoingContext(ctx, header)
|
md := gmetadata.New(header)
|
||||||
|
ctx = gmetadata.NewOutgoingContext(ctx, md)
|
||||||
|
|
||||||
cf, err := g.newCodec(req.ContentType())
|
cf, err := g.newCodec(req.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -361,8 +373,8 @@ func (g *grpcClient) maxSendMsgSizeValue() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
|
func (g *grpcClient) newCodec(ct string) (codec.Codec, error) {
|
||||||
g.mu.RLock()
|
g.RLock()
|
||||||
defer g.mu.RUnlock()
|
defer g.RUnlock()
|
||||||
|
|
||||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||||
ct = ct[:idx]
|
ct = ct[:idx]
|
||||||
@@ -398,14 +410,16 @@ func (g *grpcClient) Init(opts ...client.Option) error {
|
|||||||
|
|
||||||
// update pool configuration if the options changed
|
// update pool configuration if the options changed
|
||||||
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
|
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
|
||||||
g.pool.mu.Lock()
|
g.pool.Lock()
|
||||||
g.pool.size = g.opts.PoolSize
|
g.pool.size = g.opts.PoolSize
|
||||||
g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
|
g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
|
||||||
g.pool.mu.Unlock()
|
g.pool.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
g.funcCall = g.fnCall
|
g.funcCall = g.fnCall
|
||||||
g.funcStream = g.fnStream
|
g.funcStream = g.fnStream
|
||||||
|
g.funcPublish = g.fnPublish
|
||||||
|
g.funcBatchPublish = g.fnBatchPublish
|
||||||
|
|
||||||
g.opts.Hooks.EachPrev(func(hook options.Hook) {
|
g.opts.Hooks.EachPrev(func(hook options.Hook) {
|
||||||
switch h := hook.(type) {
|
switch h := hook.(type) {
|
||||||
@@ -413,9 +427,12 @@ func (g *grpcClient) Init(opts ...client.Option) error {
|
|||||||
g.funcCall = h(g.funcCall)
|
g.funcCall = h(g.funcCall)
|
||||||
case client.HookStream:
|
case client.HookStream:
|
||||||
g.funcStream = h(g.funcStream)
|
g.funcStream = h(g.funcStream)
|
||||||
|
case client.HookPublish:
|
||||||
|
g.funcPublish = h(g.funcPublish)
|
||||||
|
case client.HookBatchPublish:
|
||||||
|
g.funcBatchPublish = h(g.funcBatchPublish)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
g.init = true
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -424,6 +441,10 @@ func (g *grpcClient) Options() client.Options {
|
|||||||
return g.opts
|
return g.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
||||||
|
return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
||||||
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
|
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
|
||||||
}
|
}
|
||||||
@@ -742,6 +763,84 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
|
|||||||
return nil, grr
|
return nil, grr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
||||||
|
return g.funcBatchPublish(ctx, ps, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
||||||
|
return g.publish(ctx, ps, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
|
return g.funcPublish(ctx, p, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
|
return g.publish(ctx, []client.Message{p}, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcClient) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error {
|
||||||
|
var body []byte
|
||||||
|
|
||||||
|
options := client.NewPublishOptions(opts...)
|
||||||
|
|
||||||
|
// get proxy
|
||||||
|
exchange := ""
|
||||||
|
if v, ok := os.LookupEnv("MICRO_PROXY"); ok {
|
||||||
|
exchange = v
|
||||||
|
}
|
||||||
|
// get the exchange
|
||||||
|
if len(options.Exchange) > 0 {
|
||||||
|
exchange = options.Exchange
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs := make([]*broker.Message, 0, len(ps))
|
||||||
|
|
||||||
|
omd, ok := metadata.FromOutgoingContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
omd = metadata.New(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range ps {
|
||||||
|
md := metadata.Copy(omd)
|
||||||
|
topic := p.Topic()
|
||||||
|
if len(exchange) > 0 {
|
||||||
|
topic = exchange
|
||||||
|
}
|
||||||
|
md.Set(metadata.HeaderTopic, topic)
|
||||||
|
iter := p.Metadata().Iterator()
|
||||||
|
var k, v string
|
||||||
|
for iter.Next(&k, &v) {
|
||||||
|
md.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
md[metadata.HeaderContentType] = p.ContentType()
|
||||||
|
|
||||||
|
// passed in raw data
|
||||||
|
if d, ok := p.Payload().(*codec.Frame); ok {
|
||||||
|
body = d.Data
|
||||||
|
} else {
|
||||||
|
// use codec for payload
|
||||||
|
cf, err := g.newCodec(p.ContentType())
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
|
}
|
||||||
|
// set the body
|
||||||
|
b, err := cf.Marshal(p.Payload())
|
||||||
|
if err != nil {
|
||||||
|
return errors.InternalServerError("go.micro.client", "%+v", err)
|
||||||
|
}
|
||||||
|
body = b
|
||||||
|
}
|
||||||
|
msgs = append(msgs, &broker.Message{Header: md, Body: body})
|
||||||
|
}
|
||||||
|
|
||||||
|
return g.opts.Broker.BatchPublish(ctx, msgs,
|
||||||
|
broker.PublishContext(options.Context),
|
||||||
|
broker.PublishBodyOnly(options.BodyOnly),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func (g *grpcClient) String() string {
|
func (g *grpcClient) String() string {
|
||||||
return "grpc"
|
return "grpc"
|
||||||
}
|
}
|
||||||
@@ -821,6 +920,8 @@ func NewClient(opts ...client.Option) client.Client {
|
|||||||
|
|
||||||
c.funcCall = c.fnCall
|
c.funcCall = c.fnCall
|
||||||
c.funcStream = c.fnStream
|
c.funcStream = c.fnStream
|
||||||
|
c.funcPublish = c.fnPublish
|
||||||
|
c.funcBatchPublish = c.fnBatchPublish
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
20
grpc_pool.go
20
grpc_pool.go
@@ -16,7 +16,7 @@ type ConnPool struct {
|
|||||||
ttl int64
|
ttl int64
|
||||||
maxStreams int
|
maxStreams int
|
||||||
maxIdle int
|
maxIdle int
|
||||||
mu sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamsPool struct {
|
type streamsPool struct {
|
||||||
@@ -64,7 +64,7 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
addr = addr[strings.Index(addr, ":")+3:]
|
addr = addr[strings.Index(addr, ":")+3:]
|
||||||
}
|
}
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
p.mu.Lock()
|
p.Lock()
|
||||||
sp, ok := p.conns[addr]
|
sp, ok := p.conns[addr]
|
||||||
if !ok {
|
if !ok {
|
||||||
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0}
|
||||||
@@ -125,10 +125,10 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
}
|
}
|
||||||
// a good conn
|
// a good conn
|
||||||
conn.streams++
|
conn.streams++
|
||||||
p.mu.Unlock()
|
p.Unlock()
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
p.mu.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
// nolint (TODO need fix) create new conn)
|
// nolint (TODO need fix) create new conn)
|
||||||
cc, err := grpc.DialContext(ctx, addr, opts...)
|
cc, err := grpc.DialContext(ctx, addr, opts...)
|
||||||
@@ -138,24 +138,24 @@ func (p *ConnPool) Get(ctx context.Context, addr string, opts ...grpc.DialOption
|
|||||||
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
conn = &PoolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false}
|
||||||
|
|
||||||
// add conn to streams pool
|
// add conn to streams pool
|
||||||
p.mu.Lock()
|
p.Lock()
|
||||||
if sp.count < p.size {
|
if sp.count < p.size {
|
||||||
addConnAfter(conn, sp.head)
|
addConnAfter(conn, sp.head)
|
||||||
}
|
}
|
||||||
p.mu.Unlock()
|
p.Unlock()
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
func (p *ConnPool) Put(conn *PoolConn, err error) {
|
||||||
p.mu.Lock()
|
p.Lock()
|
||||||
p, sp, created := conn.pool, conn.sp, conn.created
|
p, sp, created := conn.pool, conn.sp, conn.created
|
||||||
// try to add conn
|
// try to add conn
|
||||||
if !conn.in && sp.count < p.size {
|
if !conn.in && sp.count < p.size {
|
||||||
addConnAfter(conn, sp.head)
|
addConnAfter(conn, sp.head)
|
||||||
}
|
}
|
||||||
if !conn.in {
|
if !conn.in {
|
||||||
p.mu.Unlock()
|
p.Unlock()
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -173,13 +173,13 @@ func (p *ConnPool) Put(conn *PoolConn, err error) {
|
|||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl {
|
||||||
removeConn(conn)
|
removeConn(conn)
|
||||||
p.mu.Unlock()
|
p.Unlock()
|
||||||
conn.ClientConn.Close()
|
conn.ClientConn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sp.idle++
|
sp.idle++
|
||||||
}
|
}
|
||||||
p.mu.Unlock()
|
p.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *PoolConn) Close() {
|
func (conn *PoolConn) Close() {
|
||||||
|
44
message.go
Normal file
44
message.go
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.unistack.org/micro/v3/client"
|
||||||
|
"go.unistack.org/micro/v3/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
type grpcEvent struct {
|
||||||
|
payload interface{}
|
||||||
|
topic string
|
||||||
|
contentType string
|
||||||
|
opts client.MessageOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
|
||||||
|
options := client.NewMessageOptions(opts...)
|
||||||
|
|
||||||
|
if len(options.ContentType) > 0 {
|
||||||
|
contentType = options.ContentType
|
||||||
|
}
|
||||||
|
|
||||||
|
return &grpcEvent{
|
||||||
|
payload: payload,
|
||||||
|
topic: topic,
|
||||||
|
contentType: contentType,
|
||||||
|
opts: options,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcEvent) ContentType() string {
|
||||||
|
return g.contentType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcEvent) Topic() string {
|
||||||
|
return g.topic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcEvent) Payload() interface{} {
|
||||||
|
return g.payload
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcEvent) Metadata() metadata.Metadata {
|
||||||
|
return g.opts.Metadata
|
||||||
|
}
|
@@ -4,7 +4,7 @@ package grpc
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.unistack.org/micro/v4/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/encoding"
|
"google.golang.org/grpc/encoding"
|
||||||
)
|
)
|
||||||
|
@@ -4,8 +4,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"go.unistack.org/micro/v4/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v4/codec"
|
"go.unistack.org/micro/v3/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
type grpcRequest struct {
|
type grpcRequest struct {
|
||||||
|
23
response.go
23
response.go
@@ -1,8 +1,10 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go.unistack.org/micro/v4/codec"
|
"strings"
|
||||||
"go.unistack.org/micro/v4/metadata"
|
|
||||||
|
"go.unistack.org/micro/v3/codec"
|
||||||
|
"go.unistack.org/micro/v3/metadata"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -23,19 +25,14 @@ func (r *response) Header() metadata.Metadata {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
md := metadata.New(len(meta))
|
||||||
return metadata.Metadata(meta.Copy())
|
for k, v := range meta {
|
||||||
|
md.Set(k, strings.Join(v, ","))
|
||||||
|
}
|
||||||
|
return md
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the undecoded response
|
// Read the undecoded response
|
||||||
func (r *response) Read() ([]byte, error) {
|
func (r *response) Read() ([]byte, error) {
|
||||||
f := &codec.Frame{}
|
return nil, nil
|
||||||
wrap := &wrapStream{r.stream}
|
|
||||||
|
|
||||||
_, err := wrap.Read(f.Data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return f.Data, nil
|
|
||||||
}
|
}
|
||||||
|
24
stream.go
24
stream.go
@@ -5,8 +5,8 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.unistack.org/micro/v4/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v4/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -19,8 +19,8 @@ type grpcStream struct {
|
|||||||
response client.Response
|
response client.Response
|
||||||
close func(err error)
|
close func(err error)
|
||||||
conn *PoolConn
|
conn *PoolConn
|
||||||
mu sync.RWMutex
|
sync.RWMutex
|
||||||
closed bool
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Context() context.Context {
|
func (g *grpcStream) Context() context.Context {
|
||||||
@@ -88,15 +88,15 @@ func (g *grpcStream) RecvMsg(msg interface{}) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) Error() error {
|
func (g *grpcStream) Error() error {
|
||||||
g.mu.RLock()
|
g.RLock()
|
||||||
defer g.mu.RUnlock()
|
defer g.RUnlock()
|
||||||
return g.err
|
return g.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) setError(e error) {
|
func (g *grpcStream) setError(e error) {
|
||||||
g.mu.Lock()
|
g.Lock()
|
||||||
g.err = e
|
g.err = e
|
||||||
g.mu.Unlock()
|
g.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the gRPC send stream
|
// Close the gRPC send stream
|
||||||
@@ -105,8 +105,8 @@ func (g *grpcStream) setError(e error) {
|
|||||||
// stream should still be able to receive after this function call
|
// stream should still be able to receive after this function call
|
||||||
// TODO: should the conn be closed in another way?
|
// TODO: should the conn be closed in another way?
|
||||||
func (g *grpcStream) Close() error {
|
func (g *grpcStream) Close() error {
|
||||||
g.mu.Lock()
|
g.Lock()
|
||||||
defer g.mu.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
if g.closed {
|
if g.closed {
|
||||||
return nil
|
return nil
|
||||||
@@ -125,8 +125,8 @@ func (g *grpcStream) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcStream) CloseSend() error {
|
func (g *grpcStream) CloseSend() error {
|
||||||
g.mu.Lock()
|
g.Lock()
|
||||||
defer g.mu.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
if g.closed {
|
if g.closed {
|
||||||
return nil
|
return nil
|
||||||
|
Reference in New Issue
Block a user