Compare commits

..

No commits in common. "master" and "v3.3.2" have entirely different histories.

28 changed files with 640 additions and 1017 deletions

View File

@ -1,19 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
# Maintain dependencies for GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
# Maintain dependencies for Golang
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"

20
.github/renovate.json vendored Normal file
View File

@ -0,0 +1,20 @@
{
"extends": [
"config:base"
],
"postUpdateOptions": ["gomodTidy"],
"packageRules": [
{
"matchUpdateTypes": ["minor", "patch", "pin", "digest"],
"automerge": true
},
{
"groupName": "all deps",
"separateMajorMinor": true,
"groupSlug": "all",
"packagePatterns": [
"*"
]
}
]
}

13
.github/stale.sh vendored Executable file
View File

@ -0,0 +1,13 @@
#!/bin/bash -ex
export PATH=$PATH:$(pwd)/bin
export GO111MODULE=on
export GOBIN=$(pwd)/bin
#go get github.com/rvflash/goup@v0.4.1
#goup -v ./...
#go get github.com/psampaz/go-mod-outdated@v0.6.0
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
#go list -u -m -json all | go-mod-outdated -update

View File

@ -1,20 +0,0 @@
name: "autoapprove"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
autoapprove:
runs-on: ubuntu-latest
steps:
- name: approve
uses: hmarr/auto-approve-action@v3
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
id: approve
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,21 +0,0 @@
name: "automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'vtolstov'
steps:
- name: merge
id: merge
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -3,20 +3,19 @@ on:
push: push:
branches: branches:
- master - master
- v3
jobs: jobs:
test: test:
name: test name: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup - name: setup
uses: actions/setup-go@v3 uses: actions/setup-go@v2
with: with:
go-version: 1.17 go-version: 1.16
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: cache - name: cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: lint - name: lint
uses: golangci/golangci-lint-action@v3.4.0 uses: golangci/golangci-lint-action@v2
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

View File

@ -1,78 +0,0 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "codeql"
on:
workflow_run:
workflows: ["prbuild"]
types:
- completed
push:
branches: [ master, v3 ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master, v3 ]
schedule:
- cron: '34 1 * * 0'
jobs:
analyze:
name: analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
steps:
- name: checkout
uses: actions/checkout@v3
- name: setup
uses: actions/setup-go@v3
with:
go-version: 1.17
# Initializes the CodeQL tools for scanning.
- name: init
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: analyze
uses: github/codeql-action/analyze@v2

View File

@ -1,27 +0,0 @@
name: "dependabot-automerge"
on:
pull_request_target:
types: [assigned, opened, synchronize, reopened]
permissions:
pull-requests: write
contents: write
jobs:
automerge:
runs-on: ubuntu-latest
if: github.actor == 'dependabot[bot]'
steps:
- name: metadata
id: metadata
uses: dependabot/fetch-metadata@v1.3.6
with:
github-token: "${{ secrets.TOKEN }}"
- name: merge
id: merge
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
run: gh pr merge --auto --merge "$PR_URL"
env:
PR_URL: ${{github.event.pull_request.html_url}}
GITHUB_TOKEN: ${{secrets.TOKEN}}

View File

@ -3,20 +3,19 @@ on:
pull_request: pull_request:
branches: branches:
- master - master
- v3
jobs: jobs:
test: test:
name: test name: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: setup - name: setup
uses: actions/setup-go@v3 uses: actions/setup-go@v2
with: with:
go-version: 1.17 go-version: 1.16
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: cache - name: cache
uses: actions/cache@v3 uses: actions/cache@v2
with: with:
path: ~/go/pkg/mod path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
@ -32,9 +31,9 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: checkout - name: checkout
uses: actions/checkout@v3 uses: actions/checkout@v2
- name: lint - name: lint
uses: golangci/golangci-lint-action@v3.4.0 uses: golangci/golangci-lint-action@v2
continue-on-error: true continue-on-error: true
with: with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.

24
.gitignore vendored
View File

@ -1,24 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

View File

@ -1,44 +0,0 @@
run:
concurrency: 4
deadline: 5m
issues-exit-code: 1
tests: true
linters-settings:
govet:
check-shadowing: true
enable:
- fieldalignment
linters:
enable:
- govet
- deadcode
- errcheck
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- bodyclose
- gci
- goconst
- gocritic
- gosimple
- gofmt
- gofumpt
- goimports
- golint
- gosec
- makezero
- misspell
- nakedret
- nestif
- nilerr
- noctx
- prealloc
- unconvert
- unparam
disable-all: false

View File

@ -3,14 +3,25 @@ package grpc
import ( import (
"io" "io"
"go.unistack.org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
var ( type wrapStream struct{ grpc.ServerStream }
_ codec.Codec = &wrapGrpcCodec{}
_ encoding.Codec = &wrapMicroCodec{} func (w *wrapStream) Write(d []byte) (int, error) {
) n := len(d)
err := w.ServerStream.SendMsg(&codec.Frame{Data: d})
return n, err
}
func (w *wrapStream) Read(d []byte) (int, error) {
m := &codec.Frame{}
err := w.ServerStream.RecvMsg(m)
d = m.Data
return len(d), err
}
type wrapMicroCodec struct{ codec.Codec } type wrapMicroCodec struct{ codec.Codec }
@ -18,54 +29,50 @@ func (w *wrapMicroCodec) Name() string {
return w.Codec.String() return w.Codec.String()
} }
func (w *wrapMicroCodec) Marshal(v interface{}) ([]byte, error) {
return w.Codec.Marshal(v)
}
func (w *wrapMicroCodec) Unmarshal(d []byte, v interface{}) error {
return w.Codec.Unmarshal(d, v)
}
type wrapGrpcCodec struct{ encoding.Codec } type wrapGrpcCodec struct{ encoding.Codec }
func (w *wrapGrpcCodec) String() string { func (w *wrapGrpcCodec) String() string {
return w.Codec.Name() return w.Codec.Name()
} }
func (w *wrapGrpcCodec) Marshal(v interface{}, opts ...codec.Option) ([]byte, error) { func (w *wrapGrpcCodec) Marshal(v interface{}) ([]byte, error) {
if m, ok := v.(*codec.Frame); ok { switch m := v.(type) {
case *codec.Frame:
return m.Data, nil return m.Data, nil
} }
return w.Codec.Marshal(v) return w.Codec.Marshal(v)
} }
func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}, opts ...codec.Option) error { func (w wrapGrpcCodec) Unmarshal(d []byte, v interface{}) error {
if d == nil || v == nil { if d == nil || v == nil {
return nil return nil
} }
if m, ok := v.(*codec.Frame); ok { switch m := v.(type) {
case *codec.Frame:
m.Data = d m.Data = d
return nil return nil
} }
return w.Codec.Unmarshal(d, v) return w.Codec.Unmarshal(d, v)
} }
func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error { func (g *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
return nil return nil
} }
func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error { func (g *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
if m, ok := v.(*codec.Frame); ok { // caller has requested a frame
switch m := v.(type) {
case *codec.Frame:
_, err := conn.Read(m.Data) _, err := conn.Read(m.Data)
return err return err
} }
return codec.ErrInvalidMessage return codec.ErrInvalidMessage
} }
func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error { func (g *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
// if we don't have a body // if we don't have a body
if v != nil { if v != nil {
b, err := w.Marshal(v) b, err := g.Marshal(v)
if err != nil { if err != nil {
return err return err
} }

16
context.go Normal file
View File

@ -0,0 +1,16 @@
package grpc
import (
"context"
"github.com/unistack-org/micro/v3/server"
)
func setServerOption(k, v interface{}) server.Option {
return func(o *server.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@ -6,23 +6,25 @@ import (
"net/http" "net/http"
"os" "os"
"go.unistack.org/micro/v3/errors" "github.com/unistack-org/micro/v3/errors"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
) )
var errMapping = map[int32]codes.Code{ var (
http.StatusOK: codes.OK, errMapping = map[int32]codes.Code{
http.StatusBadRequest: codes.InvalidArgument, http.StatusOK: codes.OK,
http.StatusRequestTimeout: codes.DeadlineExceeded, http.StatusBadRequest: codes.InvalidArgument,
http.StatusNotFound: codes.NotFound, http.StatusRequestTimeout: codes.DeadlineExceeded,
http.StatusConflict: codes.AlreadyExists, http.StatusNotFound: codes.NotFound,
http.StatusForbidden: codes.PermissionDenied, http.StatusConflict: codes.AlreadyExists,
http.StatusUnauthorized: codes.Unauthenticated, http.StatusForbidden: codes.PermissionDenied,
http.StatusPreconditionFailed: codes.FailedPrecondition, http.StatusUnauthorized: codes.Unauthenticated,
http.StatusNotImplemented: codes.Unimplemented, http.StatusPreconditionFailed: codes.FailedPrecondition,
http.StatusInternalServerError: codes.Internal, http.StatusNotImplemented: codes.Unimplemented,
http.StatusServiceUnavailable: codes.Unavailable, http.StatusInternalServerError: codes.Internal,
} http.StatusServiceUnavailable: codes.Unavailable,
}
)
// convertCode converts a standard Go error into its canonical code. Note that // convertCode converts a standard Go error into its canonical code. Note that
// this is only used to translate the error returned by the server applications. // this is only used to translate the error returned by the server applications.
@ -58,7 +60,8 @@ func microError(err error) codes.Code {
} }
var ec int32 var ec int32
if verr, ok := err.(*errors.Error); ok { switch verr := err.(type) {
case *errors.Error:
ec = verr.Code ec = verr.Code
} }

32
go.mod
View File

@ -1,28 +1,14 @@
module go.unistack.org/micro-server-grpc/v3 module github.com/unistack-org/micro-server-grpc/v3
go 1.20
go 1.15
require ( require (
github.com/golang/protobuf v1.5.4 github.com/golang/protobuf v1.5.2
github.com/quic-go/quic-go v0.41.0 github.com/unistack-org/micro/v3 v3.3.13
go.unistack.org/micro/v3 v3.10.48 golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d
golang.org/x/net v0.22.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/grpc v1.62.1 google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.26.0
) )
require ( //replace github.com/unistack-org/micro/v3 => ../../micro
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/onsi/ginkgo/v2 v2.16.0 // indirect
github.com/stretchr/testify v1.8.3 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect
)

156
go.sum
View File

@ -1,53 +1,111 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/onsi/ginkgo/v2 v2.16.0 h1:7q1w9frJDzninhXxjZd+Y/x54XNjG/UlRLIYPZafsPM= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/onsi/ginkgo/v2 v2.16.0/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs= github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/quic-go v0.41.0 h1:aD8MmHfgqTURWNJy48IYFg2OnxwHT3JL7ahGs73lb4k= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/quic-go/quic-go v0.41.0/go.mod h1:qCkNjqczPEvgsOnxZ0eCD14lv+B2LHlFAB++CNOh9hA= github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/unistack-org/micro/v3 v3.3.13 h1:y4bDDkbwnjgOckrhFkC6D/o42tr75X33UbrB+Ko0M68=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/unistack-org/micro/v3 v3.3.13/go.mod h1:98hNcMXp/WyWJwLwCuwrhN1Jm7aCWaRNsMfRjK8Fq+Y=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
go.unistack.org/micro/v3 v3.10.48 h1:Ewht/3gNJNHcG7crQSuq5ys60tgVJYgtQ2oyymJspko= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
go.unistack.org/micro/v3 v3.10.48/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d h1:BgJvlyh+UqCUaPlscHJ+PN8GcpfrFdr7NHjd1JL0+Gs=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/net v0.0.0-20210415231046-e915ea6b2b7d/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c=
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

620
grpc.go
View File

@ -1,5 +1,5 @@
// Package grpc provides a grpc server // Package grpc provides a grpc server
package grpc // import "go.unistack.org/micro-server-grpc/v3" package grpc
import ( import (
"context" "context"
@ -7,63 +7,68 @@ import (
"fmt" "fmt"
"net" "net"
"reflect" "reflect"
"runtime/debug"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
// nolint: staticcheck
oldproto "github.com/golang/protobuf/proto" oldproto "github.com/golang/protobuf/proto"
"go.unistack.org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"go.unistack.org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"go.unistack.org/micro/v3/errors" "github.com/unistack-org/micro/v3/errors"
"go.unistack.org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata" metadata "github.com/unistack-org/micro/v3/metadata"
"go.unistack.org/micro/v3/register" "github.com/unistack-org/micro/v3/register"
"go.unistack.org/micro/v3/semconv" "github.com/unistack-org/micro/v3/server"
"go.unistack.org/micro/v3/server"
msync "go.unistack.org/micro/v3/sync"
"golang.org/x/net/netutil" "golang.org/x/net/netutil"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata" gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
grpcreflect "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
const ( const (
DefaultContentType = "application/grpc" defaultContentType = "application/grpc"
) )
/* type grpcServerReflection struct {
type ServerReflection struct {
srv *grpc.Server srv *grpc.Server
s *serverReflectionServer s *serverReflectionServer
} }
*/
type Server struct { type grpcServer struct {
handlers map[string]server.Handler rpc *rServer
srv *grpc.Server srv *grpc.Server
exit chan chan error exit chan chan error
wg *msync.WaitGroup wg *sync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
unknownHandler grpc.StreamHandler
sync.RWMutex sync.RWMutex
started bool opts server.Options
handlers map[string]server.Handler
subscribers map[*subscriber][]broker.Subscriber
init bool
// marks the serve as started
started bool
// used for first registration
registered bool registered bool
reflection bool reflection bool
// register service instance
rsvc *register.Service
codecs map[string]codec.Codec
} }
func newServer(opts ...server.Option) *Server { func newGRPCServer(opts ...server.Option) server.Server {
// create a grpc server // create a grpc server
g := &Server{ g := &grpcServer{
opts: server.NewOptions(opts...), opts: server.NewOptions(opts...),
rpc: &rServer{ rpc: &rServer{
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
@ -92,7 +97,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
*/ */
func (g *Server) configure(opts ...server.Option) error { func (g *grpcServer) configure(opts ...server.Option) error {
g.Lock() g.Lock()
defer g.Unlock() defer g.Unlock()
@ -109,6 +114,9 @@ func (g *Server) configure(opts ...server.Option) error {
if err := g.opts.Tracer.Init(); err != nil { if err := g.opts.Tracer.Init(); err != nil {
return err return err
} }
if err := g.opts.Auth.Init(); err != nil {
return err
}
if err := g.opts.Logger.Init(); err != nil { if err := g.opts.Logger.Init(); err != nil {
return err return err
} }
@ -129,10 +137,6 @@ func (g *Server) configure(opts ...server.Option) error {
} }
} }
for _, k := range g.opts.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
maxMsgSize := g.getMaxMsgSize() maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{ gopts := []grpc.ServerOption{
@ -141,8 +145,12 @@ func (g *Server) configure(opts ...server.Option) error {
grpc.UnknownServiceHandler(g.handler), grpc.UnknownServiceHandler(g.handler),
} }
if creds := g.getCredentials(); creds != nil {
gopts = append(gopts, grpc.Creds(creds))
}
if opts := g.getGrpcOptions(); opts != nil { if opts := g.getGrpcOptions(); opts != nil {
gopts = append(opts, gopts...) gopts = append(gopts, opts...)
} }
g.rsvc = nil g.rsvc = nil
@ -159,18 +167,16 @@ func (g *Server) configure(opts ...server.Option) error {
g.reflection = v g.reflection = v
} }
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
g.unknownHandler = h
}
if restart { if restart {
return g.Start() return g.Start()
} }
g.init = true
return nil return nil
} }
func (g *Server) getMaxMsgSize() int { func (g *grpcServer) getMaxMsgSize() int {
if g.opts.Context == nil { if g.opts.Context == nil {
return codec.DefaultMaxMsgSize return codec.DefaultMaxMsgSize
} }
@ -181,7 +187,16 @@ func (g *Server) getMaxMsgSize() int {
return s return s
} }
func (g *Server) getGrpcOptions() []grpc.ServerOption { func (g *grpcServer) getCredentials() credentials.TransportCredentials {
if g.opts.Context != nil {
if v, ok := g.opts.Context.Value(tlsAuth{}).(*tls.Config); ok && v != nil {
return credentials.NewTLS(v)
}
}
return nil
}
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
if g.opts.Context == nil { if g.opts.Context == nil {
return nil return nil
} }
@ -194,40 +209,42 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption {
return opts return opts
} }
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
var err error defer func() {
if r := recover(); r != nil {
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(config.Context, "panic recovered: ", r)
config.Logger.Error(config.Context, string(debug.Stack()))
}
err = errors.InternalServerError(g.opts.Name, "panic recovered: %v", r)
} else if err != nil {
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "grpc handler got error: %s", err)
}
}
}()
if g.wg != nil {
g.wg.Add(1)
defer g.wg.Done()
}
fullMethod, ok := grpc.MethodFromServerStream(stream) fullMethod, ok := grpc.MethodFromServerStream(stream)
if !ok { if !ok {
return status.Errorf(codes.Internal, "method does not exist in context") return status.Errorf(codes.Internal, "method does not exist in context")
} }
ts := time.Now()
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc()
defer func() {
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", fullMethod).Update(te.Seconds())
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Dec()
st := status.Convert(err)
if st == nil || st.Code() == codes.OK {
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc()
}
}()
serviceName, methodName, err := serviceMethod(fullMethod) serviceName, methodName, err := serviceMethod(fullMethod)
if err != nil { if err != nil {
return status.New(codes.InvalidArgument, err.Error()).Err() return status.New(codes.InvalidArgument, err.Error()).Err()
} }
if g.wg != nil {
g.wg.Add(1)
defer g.wg.Done()
}
// get grpc metadata // get grpc metadata
gmd, ok := gmetadata.FromIncomingContext(stream.Context()) gmd, ok := gmetadata.FromIncomingContext(stream.Context())
if !ok { if !ok {
@ -238,35 +255,15 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
for k, v := range gmd { for k, v := range gmd {
md.Set(k, strings.Join(v, ", ")) md.Set(k, strings.Join(v, ", "))
} }
md.Set("Path", fullMethod)
var td string
// timeout for server deadline // timeout for server deadline
if v, ok := md.Get("timeout"); ok { to, ok := md.Get("timeout")
if ok {
md.Del("timeout") md.Del("timeout")
td = v
}
if v, ok := md.Get("Grpc-Timeout"); ok {
md.Del("Grpc-Timeout")
td = v[:len(v)-1]
switch v[len(v)-1:] {
case "S":
td += "s"
case "M":
td += "m"
case "H":
td += "h"
case "m":
td += "ms"
case "u":
td += "us"
case "n":
td += "ns"
}
} }
// get content type // get content type
ct := DefaultContentType ct := defaultContentType
if ctype, ok := md.Get("content-type"); ok { if ctype, ok := md.Get("content-type"); ok {
ct = ctype ct = ctype
@ -280,13 +277,13 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
// get peer from context // get peer from context
if p, ok := peer.FromContext(stream.Context()); ok { if p, ok := peer.FromContext(stream.Context()); ok {
md.Set("Remote", p.Addr.String()) md["Remote"] = p.Addr.String()
ctx = peer.NewContext(ctx, p) ctx = peer.NewContext(ctx, p)
} }
// set the timeout if we have it // set the timeout if we have it
if len(td) > 0 { if len(to) > 0 {
if n, err := strconv.ParseUint(td, 10, 64); err == nil { if n, err := strconv.ParseUint(to, 10, 64); err == nil {
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) ctx, cancel = context.WithTimeout(ctx, time.Duration(n))
defer cancel() defer cancel()
@ -297,163 +294,197 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
svc := g.rpc.serviceMap[serviceName] svc := g.rpc.serviceMap[serviceName]
g.rpc.mu.RUnlock() g.rpc.mu.RUnlock()
/* if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} svc = &service{}
svc = &service{} svc.typ = reflect.TypeOf(rfl)
svc.typ = reflect.TypeOf(rfl) svc.rcvr = reflect.ValueOf(rfl)
svc.rcvr = reflect.ValueOf(rfl) svc.name = reflect.Indirect(svc.rcvr).Type().Name()
svc.name = reflect.Indirect(svc.rcvr).Type().Name() svc.method = make(map[string]*methodType)
svc.method = make(map[string]*methodType) typ := reflect.TypeOf(rfl)
typ := reflect.TypeOf(rfl) if me, ok := typ.MethodByName("ServerReflectionInfo"); ok {
if me, ok := typ.MethodByName("ServerReflectionInfo"); ok { g.rpc.mu.Lock()
g.rpc.mu.Lock() ep, err := prepareEndpoint(me)
ep, err := prepareEndpoint(me) if ep != nil && err != nil {
if ep != nil && err != nil { svc.method["ServerReflectionInfo"] = ep
svc.method["ServerReflectionInfo"] = ep } else if err != nil {
} else if err != nil { return status.New(codes.Unimplemented, err.Error()).Err()
return status.New(codes.Unimplemented, err.Error()).Err()
}
g.rpc.mu.Unlock()
} }
g.rpc.mu.Unlock()
} }
*/ }
if svc == nil { if svc == nil {
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
} }
mtype := svc.method[methodName] mtype := svc.method[methodName]
if mtype == nil { if mtype == nil {
if g.unknownHandler != nil { return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s.%s", serviceName, methodName)).Err()
return g.unknownHandler(srv, stream)
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
} }
// process unary // process unary
if !mtype.stream { if !mtype.stream {
return g.processRequest(ctx, stream, svc, mtype, ct) return g.processRequest(stream, svc, mtype, ct, ctx)
} }
// process stream // process stream
return g.processStream(ctx, stream, svc, mtype, ct) return g.processStream(stream, svc, mtype, ct, ctx)
} }
func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { func (g *grpcServer) processRequest(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error {
// for { for {
var err error var argv, replyv reflect.Value
var argv, replyv reflect.Value
// Decode the argument value. // Decode the argument value.
argIsValue := false // if true, need to indirect before calling. argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr { if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem()) argv = reflect.New(mtype.ArgType.Elem())
} else { } else {
argv = reflect.New(mtype.ArgType) argv = reflect.New(mtype.ArgType)
argIsValue = true argIsValue = true
}
// Unmarshal request
if err = stream.RecvMsg(argv.Interface()); err != nil {
return err
}
if argIsValue {
argv = argv.Elem()
}
// reply value
replyv = reflect.New(mtype.ReplyType.Elem())
function := mtype.method.Func
var returnValues []reflect.Value
// create a client.Request
r := &rpcRequest{
service: g.opts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
payload: argv.Interface(),
}
// define the handler func
fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), argv, reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
} }
return err // Unmarshal request
} if err := stream.RecvMsg(argv.Interface()); err != nil {
// wrap the handler func
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
fn = g.opts.HdlrWrappers[i-1](fn)
}
statusCode := codes.OK
statusDesc := ""
// execute the handler
appErr := fn(ctx, r, replyv.Interface())
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err = stream.SendHeader(gmetadata.New(outmd)); err != nil {
return err return err
} }
}
if appErr != nil { if argIsValue {
var errStatus *status.Status argv = argv.Elem()
switch verr := appErr.(type) {
case *errors.Error:
statusCode = microError(verr)
statusDesc = verr.Error()
errStatus = status.New(statusCode, statusDesc)
case proto.Message:
// user defined error that proto based we can attach it to grpc status
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr))
if err != nil {
return err
}
case (interface{ GRPCStatus() *status.Status }):
errStatus = verr.GRPCStatus()
default:
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
}
// default case user pass own error type that not proto based
statusCode = convertCode(verr)
statusDesc = verr.Error()
errStatus = status.New(statusCode, statusDesc)
} }
return errStatus.Err() // reply value
} replyv = reflect.New(mtype.ReplyType.Elem())
if err := stream.SendMsg(replyv.Interface()); err != nil { function := mtype.method.Func
return err var returnValues []reflect.Value
}
return status.New(statusCode, statusDesc).Err() cf, err := g.newCodec(ct)
if err != nil {
return errors.InternalServerError(g.opts.Name, err.Error())
}
b, err := cf.Marshal(argv.Interface())
if err != nil {
return err
}
// create a client.Request
r := &rpcRequest{
service: g.opts.Name,
contentType: ct,
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
body: b,
payload: argv.Interface(),
}
// define the handler func
fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
// The return value for the method is an error.
if rerr := returnValues[0].Interface(); rerr != nil {
err = rerr.(error)
}
return err
}
// wrap the handler func
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
fn = g.opts.HdlrWrappers[i-1](fn)
}
statusCode := codes.OK
statusDesc := ""
// execute the handler
if appErr := fn(ctx, r, replyv.Interface()); appErr != nil {
var errStatus *status.Status
switch verr := appErr.(type) {
case *errors.Error:
statusCode = microError(verr)
statusDesc = verr.Error()
errStatus = status.New(statusCode, statusDesc)
case proto.Message:
// user defined error that proto based we can attach it to grpc status
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
errStatus, err = status.New(statusCode, statusDesc).WithDetails(oldproto.MessageV1(verr))
if err != nil {
return err
}
default:
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
}
// default case user pass own error type that not proto based
statusCode = convertCode(verr)
statusDesc = verr.Error()
errStatus = status.New(statusCode, statusDesc)
}
return errStatus.Err()
}
if err := stream.SendMsg(replyv.Interface()); err != nil {
return err
}
return status.New(statusCode, statusDesc).Err()
}
} }
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { type reflectStream struct {
stream server.Stream
}
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
return s.stream.Send(rsp)
}
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
req := &grpcreflect.ServerReflectionRequest{}
err := s.stream.Recv(req)
return req, err
}
func (s *reflectStream) SetHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SendHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SetTrailer(gmetadata.MD) {
}
func (s *reflectStream) Context() context.Context {
return s.stream.Context()
}
func (s *reflectStream) SendMsg(m interface{}) error {
return s.stream.Send(m)
}
func (s *reflectStream) RecvMsg(m interface{}) error {
return s.stream.Recv(m)
}
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
return g.s.ServerReflectionInfo(&reflectStream{stream})
}
func (g *grpcServer) processStream(stream grpc.ServerStream, service *service, mtype *methodType, ct string, ctx context.Context) error {
opts := g.opts opts := g.opts
r := &rpcRequest{ r := &rpcRequest{
service: opts.Name, service: opts.Name,
contentType: ct, contentType: ct,
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name), method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
endpoint: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
stream: true, stream: true,
} }
@ -482,13 +513,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
statusCode := codes.OK statusCode := codes.OK
statusDesc := "" statusDesc := ""
appErr := fn(ctx, r, ss) if appErr := fn(ctx, r, ss); appErr != nil {
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
if err := stream.SendHeader(gmetadata.New(outmd)); err != nil {
return err
}
}
if appErr != nil {
var err error var err error
var errStatus *status.Status var errStatus *status.Status
switch verr := appErr.(type) { switch verr := appErr.(type) {
@ -520,7 +545,7 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
return status.New(statusCode, statusDesc).Err() return status.New(statusCode, statusDesc).Err()
} }
func (g *Server) newCodec(ct string) (codec.Codec, error) { func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
g.RLock() g.RLock()
defer g.RUnlock() defer g.RUnlock()
@ -535,7 +560,7 @@ func (g *Server) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (g *Server) Options() server.Options { func (g *grpcServer) Options() server.Options {
g.RLock() g.RLock()
opts := g.opts opts := g.opts
g.RUnlock() g.RUnlock()
@ -543,15 +568,18 @@ func (g *Server) Options() server.Options {
return opts return opts
} }
func (g *Server) Init(opts ...server.Option) error { func (g *grpcServer) Init(opts ...server.Option) error {
if len(opts) == 0 && g.init {
return nil
}
return g.configure(opts...) return g.configure(opts...)
} }
func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
return newRPCHandler(h, opts...) return newRpcHandler(h, opts...)
} }
func (g *Server) Handle(h server.Handler) error { func (g *grpcServer) Handle(h server.Handler) error {
if err := g.rpc.register(h.Handler()); err != nil { if err := g.rpc.register(h.Handler()); err != nil {
return err return err
} }
@ -560,11 +588,11 @@ func (g *Server) Handle(h server.Handler) error {
return nil return nil
} }
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, sb, opts...) return newSubscriber(topic, sb, opts...)
} }
func (g *Server) Subscribe(sb server.Subscriber) error { func (g *grpcServer) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber) sub, ok := sb.(*subscriber)
if !ok { if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber") return fmt.Errorf("invalid subscriber: expected *subscriber")
@ -588,7 +616,7 @@ func (g *Server) Subscribe(sb server.Subscriber) error {
return nil return nil
} }
func (g *Server) Register() error { func (g *grpcServer) Register() error {
g.RLock() g.RLock()
rsvc := g.rsvc rsvc := g.rsvc
config := g.opts config := g.opts
@ -609,18 +637,22 @@ func (g *Server) Register() error {
g.RLock() g.RLock()
// Maps are ordered randomly, sort the keys for consistency // Maps are ordered randomly, sort the keys for consistency
handlerList := make([]string, 0, len(g.handlers)) var handlerList []string
for n := range g.handlers { for n, e := range g.handlers {
// Only advertise non internal handlers // Only advertise non internal handlers
handlerList = append(handlerList, n) if !e.Options().Internal {
handlerList = append(handlerList, n)
}
} }
sort.Strings(handlerList) sort.Strings(handlerList)
subscriberList := make([]*subscriber, 0, len(g.subscribers)) var subscriberList []*subscriber
for e := range g.subscribers { for e := range g.subscribers {
// Only advertise non internal subscribers // Only advertise non internal subscribers
subscriberList = append(subscriberList, e) if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
} }
sort.Slice(subscriberList, func(i, j int) bool { sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic return subscriberList[i].topic > subscriberList[j].topic
@ -636,7 +668,7 @@ func (g *Server) Register() error {
g.RUnlock() g.RUnlock()
service.Nodes[0].Metadata["protocol"] = "grpc" service.Nodes[0].Metadata["protocol"] = "grpc"
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] service.Nodes[0].Metadata["transport"] = "grpc"
service.Endpoints = endpoints service.Endpoints = endpoints
g.RLock() g.RLock()
@ -645,7 +677,7 @@ func (g *Server) Register() error {
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID) config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].Id)
} }
} }
@ -662,13 +694,37 @@ func (g *Server) Register() error {
g.Lock() g.Lock()
defer g.Unlock() defer g.Unlock()
for sb := range g.subscribers {
handler := g.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
g.registered = true g.registered = true
g.rsvc = service g.rsvc = service
return nil return nil
} }
func (g *Server) Deregister() error { func (g *grpcServer) Deregister() error {
var err error var err error
g.RLock() g.RLock()
@ -681,7 +737,7 @@ func (g *Server) Deregister() error {
} }
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID) config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].Id)
} }
if err := server.DefaultDeregisterFunc(service, config); err != nil { if err := server.DefaultDeregisterFunc(service, config); err != nil {
@ -722,7 +778,7 @@ func (g *Server) Deregister() error {
return nil return nil
} }
func (g *Server) Start() error { func (g *grpcServer) Start() error {
g.RLock() g.RLock()
if g.started { if g.started {
g.RUnlock() g.RUnlock()
@ -732,13 +788,18 @@ func (g *Server) Start() error {
config := g.Options() config := g.Options()
for _, k := range config.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
// micro: config.Transport.Listen(config.Address) // micro: config.Transport.Listen(config.Address)
var ts net.Listener var ts net.Listener
var err error
if l := config.Listener; l != nil { if l := config.Listener; l != nil {
ts = l ts = l
} else { } else {
var err error
// check the tls config for secure connect // check the tls config for secure connect
if tc := config.TLSConfig; tc != nil { if tc := config.TLSConfig; tc != nil {
ts, err = tls.Listen("tcp", config.Address, tc) ts, err = tls.Listen("tcp", config.Address, tc)
@ -768,7 +829,7 @@ func (g *Server) Start() error {
// only connect if we're subscribed // only connect if we're subscribed
if len(g.subscribers) > 0 { if len(g.subscribers) > 0 {
// connect to the broker // connect to the broker
if err = config.Broker.Connect(config.Context); err != nil { if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Broker [%s] connect error: %v", config.Broker.String(), err) config.Logger.Errorf(config.Context, "Broker [%s] connect error: %v", config.Broker.String(), err)
} }
@ -781,31 +842,26 @@ func (g *Server) Start() error {
} }
// use RegisterCheck func before register // use RegisterCheck func before register
// nolint: nestif if err := g.opts.RegisterCheck(config.Context); err != nil {
if err = g.opts.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.Id, err)
} }
} else { } else {
// announce self to the world // announce self to the world
if err = g.Register(); err != nil { if err := g.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server register error: %v", err) config.Logger.Errorf(config.Context, "Server register error: %v", err)
} }
} }
} }
if err = g.subscribe(); err != nil {
return err
}
// micro: go ts.Accept(s.accept) // micro: go ts.Accept(s.accept)
go func() { go func() {
if err = g.srv.Serve(ts); err != nil { if err := g.srv.Serve(ts); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "gRPC Server start error: %v", err) config.Logger.Errorf(config.Context, "gRPC Server start error: %v", err)
} }
if err = g.Stop(); err != nil { if err := g.Stop(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "gRPC Server stop error: %v", err) config.Logger.Errorf(config.Context, "gRPC Server stop error: %v", err)
} }
@ -834,26 +890,25 @@ func (g *Server) Start() error {
registered := g.registered registered := g.registered
g.RUnlock() g.RUnlock()
rerr := g.opts.RegisterCheck(g.opts.Context) rerr := g.opts.RegisterCheck(g.opts.Context)
// nolint: nestif
if rerr != nil && registered { if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
} }
// deregister self in case of error // deregister self in case of error
if err = g.Deregister(); err != nil { if err := g.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err) config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.Id, err)
} }
} }
} else if rerr != nil && !registered { } else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr) config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.Id, rerr)
} }
continue continue
} }
if err = g.Register(); err != nil { if err := g.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err) config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.Id, err)
} }
} }
// wait for exit // wait for exit
@ -863,9 +918,9 @@ func (g *Server) Start() error {
} }
// deregister self // deregister self
if err = g.Deregister(); err != nil { if err := g.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server deregister error: %v", err) config.Logger.Errorf(config.Context, "Server deregister error: ", err)
} }
} }
@ -884,7 +939,7 @@ func (g *Server) Start() error {
select { select {
case <-exit: case <-exit:
case <-time.After(g.opts.GracefulTimeout): case <-time.After(time.Second):
g.srv.Stop() g.srv.Stop()
} }
@ -895,7 +950,7 @@ func (g *Server) Start() error {
config.Logger.Infof(config.Context, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) config.Logger.Infof(config.Context, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
} }
// disconnect broker // disconnect broker
if err = config.Broker.Disconnect(config.Context); err != nil { if err := config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) { if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Broker [%s] disconnect error: %v", config.Broker.String(), err) config.Logger.Errorf(config.Context, "Broker [%s] disconnect error: %v", config.Broker.String(), err)
} }
@ -910,38 +965,7 @@ func (g *Server) Start() error {
return nil return nil
} }
func (g *Server) subscribe() error { func (g *grpcServer) Stop() error {
config := g.opts
for sb := range g.subscribers {
handler := g.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (g *Server) Stop() error {
g.RLock() g.RLock()
if !g.started { if !g.started {
g.RUnlock() g.RUnlock()
@ -961,18 +985,14 @@ func (g *Server) Stop() error {
return err return err
} }
func (g *Server) String() string { func (g *grpcServer) String() string {
return "grpc" return "grpc"
} }
func (g *Server) Name() string { func (g *grpcServer) Name() string {
return g.opts.Name return g.opts.Name
} }
func (g *Server) GRPCServer() *grpc.Server { func NewServer(opts ...server.Option) server.Server {
return g.srv return newGRPCServer(opts...)
}
func NewServer(opts ...server.Option) *Server {
return newServer(opts...)
} }

View File

@ -3,18 +3,18 @@ package grpc
import ( import (
"reflect" "reflect"
"go.unistack.org/micro/v3/register" "github.com/unistack-org/micro/v3/register"
"go.unistack.org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
) )
type rpcHandler struct { type rpcHandler struct {
opts server.HandlerOptions
handler interface{}
name string name string
handler interface{}
endpoints []*register.Endpoint endpoints []*register.Endpoint
opts server.HandlerOptions
} }
func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler { func newRpcHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
options := server.NewHandlerOptions(opts...) options := server.NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler) typ := reflect.TypeOf(handler)

View File

@ -2,19 +2,18 @@ package grpc
import ( import (
"context" "context"
"crypto/tls"
"go.unistack.org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
type ( type codecsKey struct{}
codecsKey struct{} type grpcOptions struct{}
grpcOptions struct{} type maxMsgSizeKey struct{}
maxMsgSizeKey struct{} type tlsAuth struct{}
reflectionKey struct{} type reflectionKey struct{}
unknownServiceHandlerKey struct{}
)
// gRPC Codec to be used to encode/decode requests for a given content type // gRPC Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c encoding.Codec) server.Option { func Codec(contentType string, c encoding.Codec) server.Option {
@ -31,24 +30,25 @@ func Codec(contentType string, c encoding.Codec) server.Option {
} }
} }
// AuthTLS should be used to setup a secure authentication using TLS
func AuthTLS(t *tls.Config) server.Option {
return setServerOption(tlsAuth{}, t)
}
// Options to be used to configure gRPC options // Options to be used to configure gRPC options
func Options(opts ...grpc.ServerOption) server.Option { func Options(opts ...grpc.ServerOption) server.Option {
return server.SetOption(grpcOptions{}, opts) return setServerOption(grpcOptions{}, opts)
} }
// //
// MaxMsgSize set the maximum message in bytes the server can receive and // MaxMsgSize set the maximum message in bytes the server can receive and
// send. Default maximum message size is 4 MB. // send. Default maximum message size is 4 MB.
//
func MaxMsgSize(s int) server.Option { func MaxMsgSize(s int) server.Option {
return server.SetOption(maxMsgSizeKey{}, s) return setServerOption(maxMsgSizeKey{}, s)
} }
// Reflection enables reflection support in grpc server // Reflection enables reflection support in grpc server
func Reflection(b bool) server.Option { func Reflection(b bool) server.Option {
return server.SetOption(reflectionKey{}, b) return setServerOption(reflectionKey{}, b)
}
// UnknownServiceHandler enables support for all services
func UnknownServiceHandler(h grpc.StreamHandler) server.Option {
return server.SetOption(unknownServiceHandlerKey{}, h)
} }

View File

@ -1,166 +0,0 @@
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
// copyright sssgun with MIT license
package grpcquic // import "go.unistack.org/micro-server-grpc/v3/quic"
import (
"context"
"crypto/tls"
"net"
"time"
quic "github.com/quic-go/quic-go"
)
///////////////////////////////////////////////////////////////////////////////
// Connection
var _ net.Conn = (*Conn)(nil)
type Conn struct {
conn quic.Connection
stream quic.Stream
}
func NewConn(conn quic.Connection) (net.Conn, error) {
stream, err := conn.OpenStreamSync(context.Background())
if err != nil {
return nil, err
}
return &Conn{conn, stream}, nil
}
// Read reads data from the connection.
// Read can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetReadDeadline.
func (c *Conn) Read(b []byte) (n int, err error) {
return c.stream.Read(b)
}
// Write writes data to the connection.
// Write can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetWriteDeadline.
func (c *Conn) Write(b []byte) (n int, err error) {
return c.stream.Write(b)
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *Conn) Close() error {
// @TODO: log this
c.stream.Close()
return c.conn.CloseWithError(0, "")
}
// LocalAddr returns the local network address.
func (c *Conn) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
// RemoteAddr returns the remote network address.
func (c *Conn) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to Read or
// Write. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}
///////////////////////////////////////////////////////////////////////////////
// Listener
var _ net.Listener = (*Listener)(nil)
type Listener struct {
ql quic.Listener
}
func Listen(ql quic.Listener) net.Listener {
return &Listener{ql}
}
// Accept waits for and returns the next connection to the listener.
func (l *Listener) Accept() (net.Conn, error) {
sess, err := l.ql.Accept(context.Background())
if err != nil {
return nil, err
}
stream, err := sess.AcceptStream(context.Background())
if err != nil {
return nil, err
}
return &Conn{sess, stream}, nil
}
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (l *Listener) Close() error {
return l.ql.Close()
}
// Addr returns the listener's network address.
func (l *Listener) Addr() net.Addr {
return l.ql.Addr()
}
///////////////////////////////////////////////////////////////////////////////
// Dialer
var QuicConfig = &quic.Config{
KeepAlivePeriod: 10 * time.Second,
}
func NewPacketConn(addr string) (net.PacketConn, error) {
// create a packet conn for outgoing connections
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
return net.ListenUDP("udp", udpAddr)
}
func NewQuicDialer(tlsConf *tls.Config) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, target string) (net.Conn, error) {
sess, err := quic.DialAddr(ctx, target, tlsConf, QuicConfig)
if err != nil {
return nil, err
}
return NewConn(sess)
}
}

View File

@ -1,117 +0,0 @@
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
// copyright sssgun with MIT license
package grpcquic
import (
"context"
"crypto/tls"
"net"
"google.golang.org/grpc/credentials"
)
var _ credentials.AuthInfo = (*Info)(nil)
// Info contains the auth information
type Info struct {
conn *Conn
}
func NewInfo(c *Conn) *Info {
return &Info{c}
}
// AuthType returns the type of Info as a string.
func (i *Info) AuthType() string {
return "quic-tls"
}
func (i *Info) Conn() net.Conn {
return i.conn
}
var _ credentials.TransportCredentials = (*Credentials)(nil)
type Credentials struct {
tlsConfig *tls.Config
isQuicConnection bool
serverName string
grpcCreds credentials.TransportCredentials
}
func NewCredentials(tlsConfig *tls.Config) credentials.TransportCredentials {
grpcCreds := credentials.NewTLS(tlsConfig)
return &Credentials{
grpcCreds: grpcCreds,
tlsConfig: tlsConfig,
}
}
// ClientHandshake does the authentication handshake specified by the corresponding
// authentication protocol on rawConn for clients. It returns the authenticated
// connection and the corresponding auth information about the connection.
// Implementations must use the provided context to implement timely cancellation.
// gRPC will try to reconnect if the error returned is a temporary error
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
// If the returned error is a wrapper error, implementations should make sure that
// the error implements Temporary() to have the correct retry behaviors.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
func (pt *Credentials) ClientHandshake(ctx context.Context, authority string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if c, ok := conn.(*Conn); ok {
pt.isQuicConnection = true
return conn, NewInfo(c), nil
}
return pt.grpcCreds.ClientHandshake(ctx, authority, conn)
}
// ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about
// the connection.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
func (pt *Credentials) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if c, ok := conn.(*Conn); ok {
pt.isQuicConnection = true
ainfo := NewInfo(c)
return conn, ainfo, nil
}
return pt.grpcCreds.ServerHandshake(conn)
}
// Info provides the ProtocolInfo of this Credentials.
func (pt *Credentials) Info() credentials.ProtocolInfo {
if pt.isQuicConnection {
return credentials.ProtocolInfo{
// ProtocolVersion is the gRPC wire protocol version.
ProtocolVersion: "/quic/1.0.0",
// SecurityProtocol is the security protocol in use.
SecurityProtocol: "quic-tls",
// SecurityVersion is the security protocol version.
// SecurityVersion: "1.2.0",
// ServerName is the user-configured server name.
ServerName: pt.serverName,
}
}
return pt.grpcCreds.Info()
}
// Clone makes a copy of this Credentials.
func (pt *Credentials) Clone() credentials.TransportCredentials {
return &Credentials{
tlsConfig: pt.tlsConfig.Clone(),
grpcCreds: pt.grpcCreds.Clone(),
}
}
// OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
// gRPC internals also use it to override the virtual hosting name if it is set.
// It must be called before dialing. Currently, this is only used by grpclb.
func (pt *Credentials) OverrideServerName(name string) error {
pt.serverName = name
return pt.grpcCreds.OverrideServerName(name)
}

View File

@ -1,5 +1,3 @@
// +build ignore
/* /*
* *
* Copyright 2016 gRPC authors. * Copyright 2016 gRPC authors.

View File

@ -3,34 +3,31 @@ package grpc
import ( import (
"io" "io"
"go.unistack.org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
)
var (
_ server.Request = &rpcRequest{}
_ server.Message = &rpcMessage{}
) )
type rpcRequest struct { type rpcRequest struct {
rw io.ReadWriter rw io.ReadWriter
payload interface{} service string
codec codec.Codec
header metadata.Metadata
method string method string
endpoint string endpoint string
target string
contentType string contentType string
service string codec codec.Codec
header metadata.Metadata
body []byte
stream bool stream bool
payload interface{}
} }
type rpcMessage struct { type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string topic string
contentType string contentType string
payload interface{}
header metadata.Metadata
body []byte
codec codec.Codec
} }
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
@ -46,7 +43,7 @@ func (r *rpcRequest) Method() string {
} }
func (r *rpcRequest) Endpoint() string { func (r *rpcRequest) Endpoint() string {
return r.endpoint return r.method
} }
func (r *rpcRequest) Codec() codec.Codec { func (r *rpcRequest) Codec() codec.Codec {
@ -81,7 +78,7 @@ func (r *rpcMessage) Topic() string {
return r.topic return r.topic
} }
func (r *rpcMessage) Body() interface{} { func (r *rpcMessage) Payload() interface{} {
return r.payload return r.payload
} }
@ -89,6 +86,10 @@ func (r *rpcMessage) Header() metadata.Metadata {
return r.header return r.header
} }
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Codec { func (r *rpcMessage) Codec() codec.Codec {
return r.codec return r.codec
} }

View File

@ -3,17 +3,18 @@ package grpc
import ( import (
"io" "io"
"go.unistack.org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
) )
var _ server.Response = &rpcResponse{}
type rpcResponse struct { type rpcResponse struct {
rw io.ReadWriter rw io.ReadWriter
header metadata.Metadata header metadata.Metadata
codec codec.Codec codec codec.Codec
endpoint string
service string
method string
target string
} }
func (r *rpcResponse) Codec() codec.Codec { func (r *rpcResponse) Codec() codec.Codec {

View File

@ -14,35 +14,37 @@ import (
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
"go.unistack.org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
) )
// Precompute the reflect type for error. Can't use error directly var (
// because Typeof takes an empty interface value. This is annoying. // Precompute the reflect type for error. Can't use error directly
var typeOfError = reflect.TypeOf((*error)(nil)).Elem() // because Typeof takes an empty interface value. This is annoying.
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type methodType struct { type methodType struct {
method reflect.Method
ArgType reflect.Type ArgType reflect.Type
ReplyType reflect.Type ReplyType reflect.Type
ContextType reflect.Type ContextType reflect.Type
method reflect.Method
stream bool stream bool
} }
// type reflectionType func(context.Context, server.Stream) error type reflectionType func(context.Context, server.Stream) error
type service struct { type service struct {
typ reflect.Type name string // name of service
method map[string]*methodType rcvr reflect.Value // receiver of methods for the service
rcvr reflect.Value typ reflect.Type // type of the receiver
name string method map[string]*methodType // registered methods
} }
// server represents an RPC Server. // server represents an RPC Server.
type rServer struct { type rServer struct {
mu sync.RWMutex // protects the serviceMap
serviceMap map[string]*service serviceMap map[string]*service
mu sync.RWMutex reflection bool
// reflection bool
} }
// Is this an exported - upper case - name? // Is this an exported - upper case - name?
@ -89,14 +91,15 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) {
return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn()) return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
} }
switch stream { if stream {
case true:
// check stream type // check stream type
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem() streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
if !argType.Implements(streamType) { if !argType.Implements(streamType) {
return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType) return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType)
} }
default: } else {
// if not stream check the replyType
// First arg need not be a pointer. // First arg need not be a pointer.
if !isExportedOrBuiltinType(argType) { if !isExportedOrBuiltinType(argType) {
return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType) return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType)

View File

@ -3,7 +3,7 @@ package grpc
import ( import (
"context" "context"
"go.unistack.org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
"google.golang.org/grpc" "google.golang.org/grpc"
) )

View File

@ -4,18 +4,25 @@ import (
"context" "context"
"fmt" "fmt"
"reflect" "reflect"
"runtime/debug"
"strings" "strings"
"go.unistack.org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"go.unistack.org/micro/v3/metadata" "github.com/unistack-org/micro/v3/errors"
"go.unistack.org/micro/v3/register" "github.com/unistack-org/micro/v3/logger"
"go.unistack.org/micro/v3/server" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/server"
)
const (
subSig = "func(context.Context, interface{}) error"
) )
type handler struct { type handler struct {
method reflect.Value
reqType reflect.Type reqType reflect.Type
ctxType reflect.Type ctxType reflect.Type
method reflect.Value
} }
type subscriber struct { type subscriber struct {
@ -99,8 +106,17 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
} }
} }
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) { return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
if g.opts.Logger.V(logger.ErrorLevel) {
g.opts.Logger.Error(g.opts.Context, "panic recovered: ", r)
g.opts.Logger.Error(g.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(g.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msg := p.Message() msg := p.Message()
// if we don't have headers, create empty map // if we don't have headers, create empty map
@ -110,8 +126,8 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
if len(ct) == 0 { if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType msg.Header["Content-Type"] = defaultContentType
ct = DefaultContentType ct = defaultContentType
} }
cf, err := g.newCodec(ct) cf, err := g.newCodec(ct)
if err != nil { if err != nil {
@ -159,7 +175,7 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha
vals = append(vals, reflect.ValueOf(ctx)) vals = append(vals, reflect.ValueOf(ctx))
} }
vals = append(vals, reflect.ValueOf(msg.Body())) vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil { if rerr := returnValues[0].Interface(); rerr != nil {
@ -179,13 +195,14 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha
if g.wg != nil { if g.wg != nil {
defer g.wg.Done() defer g.wg.Done()
} }
cerr := fn(ctx, &rpcMessage{ err := fn(ctx, &rpcMessage{
topic: sb.topic, topic: sb.topic,
contentType: ct, contentType: ct,
payload: req.Interface(), payload: req.Interface(),
header: msg.Header, header: msg.Header,
body: msg.Body,
}) })
results <- cerr results <- err
}() }()
} }
var errors []string var errors []string

View File

@ -39,7 +39,6 @@ func serviceMethod(m string) (string, string, error) {
return parts[0], parts[1], nil return parts[0], parts[1], nil
} }
/*
// ServiceFromMethod returns the service // ServiceFromMethod returns the service
// /service.Foo/Bar => service // /service.Foo/Bar => service
func serviceFromMethod(m string) string { func serviceFromMethod(m string) string {
@ -56,4 +55,3 @@ func serviceFromMethod(m string) string {
parts = strings.Split(parts[1], ".") parts = strings.Split(parts[1], ".")
return strings.Join(parts[:len(parts)-1], ".") return strings.Join(parts[:len(parts)-1], ".")
} }
*/