Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
939d51a2a8 |
19
.github/dependabot.yml
vendored
19
.github/dependabot.yml
vendored
@@ -1,19 +0,0 @@
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
|
||||
# Maintain dependencies for GitHub Actions
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
|
||||
# Maintain dependencies for Golang
|
||||
- package-ecosystem: "gomod"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
13
.github/stale.sh
vendored
Executable file
13
.github/stale.sh
vendored
Executable file
@@ -0,0 +1,13 @@
|
||||
#!/bin/bash -ex
|
||||
|
||||
export PATH=$PATH:$(pwd)/bin
|
||||
export GO111MODULE=on
|
||||
export GOBIN=$(pwd)/bin
|
||||
|
||||
#go get github.com/rvflash/goup@v0.4.1
|
||||
|
||||
#goup -v ./...
|
||||
#go get github.com/psampaz/go-mod-outdated@v0.6.0
|
||||
go list -u -m -mod=mod -json all | go-mod-outdated -update -direct -ci || true
|
||||
|
||||
#go list -u -m -json all | go-mod-outdated -update
|
20
.github/workflows/autoapprove.yml
vendored
20
.github/workflows/autoapprove.yml
vendored
@@ -1,20 +0,0 @@
|
||||
name: "autoapprove"
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [assigned, opened, synchronize, reopened]
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
autoapprove:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: approve
|
||||
uses: hmarr/auto-approve-action@v3
|
||||
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
|
||||
id: approve
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
21
.github/workflows/automerge.yml
vendored
21
.github/workflows/automerge.yml
vendored
@@ -1,21 +0,0 @@
|
||||
name: "automerge"
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [assigned, opened, synchronize, reopened]
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
automerge:
|
||||
runs-on: ubuntu-latest
|
||||
if: github.actor == 'vtolstov'
|
||||
steps:
|
||||
- name: merge
|
||||
id: merge
|
||||
run: gh pr merge --auto --merge "$PR_URL"
|
||||
env:
|
||||
PR_URL: ${{github.event.pull_request.html_url}}
|
||||
GITHUB_TOKEN: ${{secrets.TOKEN}}
|
13
.github/workflows/build.yml
vendored
13
.github/workflows/build.yml
vendored
@@ -3,20 +3,19 @@ on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
jobs:
|
||||
test:
|
||||
name: test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: setup
|
||||
uses: actions/setup-go@v3
|
||||
uses: actions/setup-go@v1
|
||||
with:
|
||||
go-version: 1.17
|
||||
go-version: 1.15
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
uses: actions/checkout@v2
|
||||
- name: cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
|
||||
@@ -32,9 +31,9 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
uses: actions/checkout@v2
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.4.0
|
||||
uses: golangci/golangci-lint-action@v1
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
|
78
.github/workflows/codeql-analysis.yml
vendored
78
.github/workflows/codeql-analysis.yml
vendored
@@ -1,78 +0,0 @@
|
||||
# For most projects, this workflow file will not need changing; you simply need
|
||||
# to commit it to your repository.
|
||||
#
|
||||
# You may wish to alter this file to override the set of languages analyzed,
|
||||
# or to provide custom queries or build logic.
|
||||
#
|
||||
# ******** NOTE ********
|
||||
# We have attempted to detect the languages in your repository. Please check
|
||||
# the `language` matrix defined below to confirm you have the correct set of
|
||||
# supported CodeQL languages.
|
||||
#
|
||||
name: "codeql"
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
workflows: ["prbuild"]
|
||||
types:
|
||||
- completed
|
||||
push:
|
||||
branches: [ master, v3 ]
|
||||
pull_request:
|
||||
# The branches below must be a subset of the branches above
|
||||
branches: [ master, v3 ]
|
||||
schedule:
|
||||
- cron: '34 1 * * 0'
|
||||
|
||||
jobs:
|
||||
analyze:
|
||||
name: analyze
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
actions: read
|
||||
contents: read
|
||||
security-events: write
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
language: [ 'go' ]
|
||||
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
|
||||
# Learn more:
|
||||
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed
|
||||
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: setup
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.17
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: init
|
||||
uses: github/codeql-action/init@v2
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
# By default, queries listed here will override any specified in a config file.
|
||||
# Prefix the list here with "+" to use these queries and those in the config file.
|
||||
# queries: ./path/to/local/query, your-org/your-repo/queries@main
|
||||
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: autobuild
|
||||
uses: github/codeql-action/autobuild@v2
|
||||
|
||||
# ℹ️ Command-line programs to run using the OS shell.
|
||||
# 📚 https://git.io/JvXDl
|
||||
|
||||
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
|
||||
# and modify them (or add more) to build your code if your project
|
||||
# uses a compiled language
|
||||
|
||||
#- run: |
|
||||
# make bootstrap
|
||||
# make release
|
||||
|
||||
- name: analyze
|
||||
uses: github/codeql-action/analyze@v2
|
27
.github/workflows/dependabot-automerge.yml
vendored
27
.github/workflows/dependabot-automerge.yml
vendored
@@ -1,27 +0,0 @@
|
||||
name: "dependabot-automerge"
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [assigned, opened, synchronize, reopened]
|
||||
|
||||
permissions:
|
||||
pull-requests: write
|
||||
contents: write
|
||||
|
||||
jobs:
|
||||
automerge:
|
||||
runs-on: ubuntu-latest
|
||||
if: github.actor == 'dependabot[bot]'
|
||||
steps:
|
||||
- name: metadata
|
||||
id: metadata
|
||||
uses: dependabot/fetch-metadata@v1.3.6
|
||||
with:
|
||||
github-token: "${{ secrets.TOKEN }}"
|
||||
- name: merge
|
||||
id: merge
|
||||
if: ${{contains(steps.metadata.outputs.dependency-names, 'go.unistack.org')}}
|
||||
run: gh pr merge --auto --merge "$PR_URL"
|
||||
env:
|
||||
PR_URL: ${{github.event.pull_request.html_url}}
|
||||
GITHUB_TOKEN: ${{secrets.TOKEN}}
|
13
.github/workflows/pr.yml
vendored
13
.github/workflows/pr.yml
vendored
@@ -3,20 +3,19 @@ on:
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
- v3
|
||||
jobs:
|
||||
test:
|
||||
name: test
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: setup
|
||||
uses: actions/setup-go@v3
|
||||
uses: actions/setup-go@v1
|
||||
with:
|
||||
go-version: 1.17
|
||||
go-version: 1.15
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
uses: actions/checkout@v2
|
||||
- name: cache
|
||||
uses: actions/cache@v3
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
|
||||
@@ -32,9 +31,9 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
uses: actions/checkout@v2
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.4.0
|
||||
uses: golangci/golangci-lint-action@v1
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
|
@@ -1,44 +0,0 @@
|
||||
run:
|
||||
concurrency: 4
|
||||
deadline: 5m
|
||||
issues-exit-code: 1
|
||||
tests: true
|
||||
|
||||
linters-settings:
|
||||
govet:
|
||||
check-shadowing: true
|
||||
enable:
|
||||
- fieldalignment
|
||||
|
||||
linters:
|
||||
enable:
|
||||
- govet
|
||||
- deadcode
|
||||
- errcheck
|
||||
- govet
|
||||
- ineffassign
|
||||
- staticcheck
|
||||
- structcheck
|
||||
- typecheck
|
||||
- unused
|
||||
- varcheck
|
||||
- bodyclose
|
||||
- gci
|
||||
- goconst
|
||||
- gocritic
|
||||
- gosimple
|
||||
- gofmt
|
||||
- gofumpt
|
||||
- goimports
|
||||
- golint
|
||||
- gosec
|
||||
- makezero
|
||||
- misspell
|
||||
- nakedret
|
||||
- nestif
|
||||
- nilerr
|
||||
- noctx
|
||||
- prealloc
|
||||
- unconvert
|
||||
- unparam
|
||||
disable-all: false
|
192
LICENSE
192
LICENSE
@@ -1,192 +0,0 @@
|
||||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
Copyright 2015-2020 Asim Aslam.
|
||||
Copyright 2019-2020 Unistack LLC.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
30
README.md
Normal file
30
README.md
Normal file
@@ -0,0 +1,30 @@
|
||||
# GRPC Server
|
||||
|
||||
The grpc server is a [micro.Server](https://godoc.org/github.com/micro/go-micro/server#Server) compatible server.
|
||||
|
||||
## Overview
|
||||
|
||||
The server makes use of the [google.golang.org/grpc](google.golang.org/grpc) framework for the underlying server
|
||||
but continues to use micro handler signatures and protoc-gen-micro generated code.
|
||||
|
||||
## Usage
|
||||
|
||||
Specify the server to your micro service
|
||||
|
||||
```go
|
||||
import (
|
||||
"github.com/micro/go-micro"
|
||||
"github.com/micro/go-plugins/server/grpc"
|
||||
)
|
||||
|
||||
func main() {
|
||||
service := micro.NewService(
|
||||
// This needs to be first as it replaces the underlying server
|
||||
// which causes any configuration set before it
|
||||
// to be discarded
|
||||
micro.Server(grpc.NewServer()),
|
||||
micro.Name("greeter"),
|
||||
)
|
||||
}
|
||||
```
|
||||
**NOTE**: Setting the gRPC server and/or client causes the underlying the server/client to be replaced which causes any previous configuration set on that server/client to be discarded. It is therefore recommended to set gRPC server/client before any other configuration
|
232
codec.go
232
codec.go
@@ -1,77 +1,233 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"io"
|
||||
b "bytes"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
oldjsonpb "github.com/golang/protobuf/jsonpb"
|
||||
oldproto "github.com/golang/protobuf/proto"
|
||||
bytes "github.com/unistack-org/micro-codec-bytes"
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/encoding"
|
||||
"google.golang.org/grpc/metadata"
|
||||
jsonpb "google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type jsonCodec struct{}
|
||||
type bytesCodec struct{}
|
||||
type protoCodec struct{}
|
||||
type wrapCodec struct{ encoding.Codec }
|
||||
|
||||
var (
|
||||
jsonpbMarshaler = jsonpb.MarshalOptions{
|
||||
UseEnumNumbers: false,
|
||||
EmitUnpopulated: false,
|
||||
UseProtoNames: true,
|
||||
AllowPartial: false,
|
||||
}
|
||||
|
||||
jsonpbUnmarshaler = jsonpb.UnmarshalOptions{
|
||||
DiscardUnknown: false,
|
||||
AllowPartial: false,
|
||||
}
|
||||
|
||||
oldjsonpbMarshaler = oldjsonpb.Marshaler{
|
||||
OrigName: true,
|
||||
EmitDefaults: false,
|
||||
}
|
||||
|
||||
oldjsonpbUnmarshaler = oldjsonpb.Unmarshaler{
|
||||
AllowUnknownFields: false,
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
_ codec.Codec = &wrapGrpcCodec{}
|
||||
_ encoding.Codec = &wrapMicroCodec{}
|
||||
defaultGRPCCodecs = map[string]encoding.Codec{
|
||||
"application/json": jsonCodec{},
|
||||
"application/proto": protoCodec{},
|
||||
"application/protobuf": protoCodec{},
|
||||
"application/octet-stream": protoCodec{},
|
||||
"application/grpc": protoCodec{},
|
||||
"application/grpc+json": jsonCodec{},
|
||||
"application/grpc+proto": protoCodec{},
|
||||
"application/grpc+bytes": bytesCodec{},
|
||||
}
|
||||
)
|
||||
|
||||
type wrapMicroCodec struct{ codec.Codec }
|
||||
|
||||
func (w *wrapMicroCodec) Name() string {
|
||||
return w.Codec.String()
|
||||
}
|
||||
|
||||
func (w *wrapMicroCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
return w.Codec.Marshal(v)
|
||||
}
|
||||
|
||||
func (w *wrapMicroCodec) Unmarshal(d []byte, v interface{}) error {
|
||||
return w.Codec.Unmarshal(d, v)
|
||||
}
|
||||
|
||||
type wrapGrpcCodec struct{ encoding.Codec }
|
||||
|
||||
func (w *wrapGrpcCodec) String() string {
|
||||
func (w wrapCodec) String() string {
|
||||
return w.Codec.Name()
|
||||
}
|
||||
|
||||
func (w *wrapGrpcCodec) Marshal(v interface{}, opts ...codec.Option) ([]byte, error) {
|
||||
if m, ok := v.(*codec.Frame); ok {
|
||||
func (w wrapCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
switch m := v.(type) {
|
||||
case *bytes.Frame:
|
||||
return m.Data, nil
|
||||
}
|
||||
return w.Codec.Marshal(v)
|
||||
}
|
||||
|
||||
func (w *wrapGrpcCodec) Unmarshal(d []byte, v interface{}, opts ...codec.Option) error {
|
||||
if d == nil || v == nil {
|
||||
func (w wrapCodec) Unmarshal(data []byte, v interface{}) error {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
if m, ok := v.(*codec.Frame); ok {
|
||||
m.Data = d
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return w.Codec.Unmarshal(d, v)
|
||||
switch m := v.(type) {
|
||||
case *bytes.Frame:
|
||||
m.Data = data
|
||||
return nil
|
||||
}
|
||||
return w.Codec.Unmarshal(data, v)
|
||||
}
|
||||
|
||||
func (w *wrapGrpcCodec) ReadHeader(conn io.Reader, m *codec.Message, mt codec.MessageType) error {
|
||||
return nil
|
||||
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
switch m := v.(type) {
|
||||
case proto.Message:
|
||||
return proto.Marshal(m)
|
||||
case oldproto.Message:
|
||||
return oldproto.Marshal(m)
|
||||
}
|
||||
return nil, codec.ErrInvalidMessage
|
||||
}
|
||||
|
||||
func (w *wrapGrpcCodec) ReadBody(conn io.Reader, v interface{}) error {
|
||||
if m, ok := v.(*codec.Frame); ok {
|
||||
_, err := conn.Read(m.Data)
|
||||
return err
|
||||
func (protoCodec) Unmarshal(data []byte, v interface{}) error {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
switch m := v.(type) {
|
||||
case proto.Message:
|
||||
return proto.Unmarshal(data, m)
|
||||
case oldproto.Message:
|
||||
return oldproto.Unmarshal(data, m)
|
||||
}
|
||||
return codec.ErrInvalidMessage
|
||||
}
|
||||
|
||||
func (w *wrapGrpcCodec) Write(conn io.Writer, m *codec.Message, v interface{}) error {
|
||||
func (protoCodec) Name() string {
|
||||
return "proto"
|
||||
}
|
||||
|
||||
func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
switch m := v.(type) {
|
||||
case proto.Message:
|
||||
return jsonpbMarshaler.Marshal(m)
|
||||
case oldproto.Message:
|
||||
buf := b.NewBuffer(nil)
|
||||
err := oldjsonpbMarshaler.Marshal(buf, m)
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
return json.Marshal(v)
|
||||
}
|
||||
|
||||
func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
switch m := v.(type) {
|
||||
case proto.Message:
|
||||
return jsonpbUnmarshaler.Unmarshal(data, m)
|
||||
case oldproto.Message:
|
||||
return oldjsonpbUnmarshaler.Unmarshal(b.NewReader(data), m)
|
||||
}
|
||||
return json.Unmarshal(data, v)
|
||||
}
|
||||
|
||||
func (jsonCodec) Name() string {
|
||||
return "json"
|
||||
}
|
||||
|
||||
func (bytesCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
switch m := v.(type) {
|
||||
case *[]byte:
|
||||
return *m, nil
|
||||
}
|
||||
return nil, codec.ErrInvalidMessage
|
||||
}
|
||||
|
||||
func (bytesCodec) Unmarshal(data []byte, v interface{}) error {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
switch m := v.(type) {
|
||||
case *[]byte:
|
||||
*m = data
|
||||
return nil
|
||||
}
|
||||
return codec.ErrInvalidMessage
|
||||
}
|
||||
|
||||
func (bytesCodec) Name() string {
|
||||
return "bytes"
|
||||
}
|
||||
|
||||
type grpcCodec struct {
|
||||
grpc.ServerStream
|
||||
// headers
|
||||
id string
|
||||
target string
|
||||
method string
|
||||
endpoint string
|
||||
|
||||
c encoding.Codec
|
||||
}
|
||||
|
||||
func (g *grpcCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
||||
md, _ := metadata.FromIncomingContext(g.ServerStream.Context())
|
||||
if m == nil {
|
||||
m = new(codec.Message)
|
||||
}
|
||||
if m.Header == nil {
|
||||
m.Header = make(map[string]string, len(md))
|
||||
}
|
||||
for k, v := range md {
|
||||
m.Header[k] = strings.Join(v, ",")
|
||||
}
|
||||
m.Id = g.id
|
||||
m.Target = g.target
|
||||
m.Method = g.method
|
||||
m.Endpoint = g.endpoint
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcCodec) ReadBody(v interface{}) error {
|
||||
// caller has requested a frame
|
||||
switch m := v.(type) {
|
||||
case *bytes.Frame:
|
||||
return g.ServerStream.RecvMsg(m)
|
||||
}
|
||||
return g.ServerStream.RecvMsg(v)
|
||||
}
|
||||
|
||||
func (g *grpcCodec) Write(m *codec.Message, v interface{}) error {
|
||||
// if we don't have a body
|
||||
if v != nil {
|
||||
b, err := w.Marshal(v)
|
||||
b, err := g.c.Marshal(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Body = b
|
||||
}
|
||||
// write the body using the framing codec
|
||||
_, err := conn.Write(m.Body)
|
||||
return err
|
||||
return g.ServerStream.SendMsg(&bytes.Frame{Data: m.Body})
|
||||
}
|
||||
|
||||
func (g *grpcCodec) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcCodec) String() string {
|
||||
return "grpc"
|
||||
}
|
||||
|
16
context.go
Normal file
16
context.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
func setServerOption(k, v interface{}) server.Option {
|
||||
return func(o *server.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, k, v)
|
||||
}
|
||||
}
|
49
error.go
49
error.go
@@ -6,23 +6,26 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
pb "github.com/unistack-org/micro-server-grpc/errors"
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
var errMapping = map[int32]codes.Code{
|
||||
http.StatusOK: codes.OK,
|
||||
http.StatusBadRequest: codes.InvalidArgument,
|
||||
http.StatusRequestTimeout: codes.DeadlineExceeded,
|
||||
http.StatusNotFound: codes.NotFound,
|
||||
http.StatusConflict: codes.AlreadyExists,
|
||||
http.StatusForbidden: codes.PermissionDenied,
|
||||
http.StatusUnauthorized: codes.Unauthenticated,
|
||||
http.StatusPreconditionFailed: codes.FailedPrecondition,
|
||||
http.StatusNotImplemented: codes.Unimplemented,
|
||||
http.StatusInternalServerError: codes.Internal,
|
||||
http.StatusServiceUnavailable: codes.Unavailable,
|
||||
}
|
||||
var (
|
||||
errMapping = map[int32]codes.Code{
|
||||
http.StatusOK: codes.OK,
|
||||
http.StatusBadRequest: codes.InvalidArgument,
|
||||
http.StatusRequestTimeout: codes.DeadlineExceeded,
|
||||
http.StatusNotFound: codes.NotFound,
|
||||
http.StatusConflict: codes.AlreadyExists,
|
||||
http.StatusForbidden: codes.PermissionDenied,
|
||||
http.StatusUnauthorized: codes.Unauthenticated,
|
||||
http.StatusPreconditionFailed: codes.FailedPrecondition,
|
||||
http.StatusNotImplemented: codes.Unimplemented,
|
||||
http.StatusInternalServerError: codes.Internal,
|
||||
http.StatusServiceUnavailable: codes.Unavailable,
|
||||
}
|
||||
)
|
||||
|
||||
// convertCode converts a standard Go error into its canonical code. Note that
|
||||
// this is only used to translate the error returned by the server applications.
|
||||
@@ -58,7 +61,10 @@ func microError(err error) codes.Code {
|
||||
}
|
||||
|
||||
var ec int32
|
||||
if verr, ok := err.(*errors.Error); ok {
|
||||
switch verr := err.(type) {
|
||||
case *errors.Error:
|
||||
ec = verr.Code
|
||||
case *pb.Error:
|
||||
ec = verr.Code
|
||||
}
|
||||
|
||||
@@ -68,3 +74,16 @@ func microError(err error) codes.Code {
|
||||
|
||||
return codes.Unknown
|
||||
}
|
||||
|
||||
func pbError(err error) *pb.Error {
|
||||
switch verr := err.(type) {
|
||||
case nil:
|
||||
return nil
|
||||
case *errors.Error:
|
||||
return &pb.Error{Id: verr.Id, Code: verr.Code, Detail: verr.Detail, Status: verr.Status}
|
||||
case *pb.Error:
|
||||
return verr
|
||||
default:
|
||||
return &pb.Error{Code: 500, Detail: err.Error()}
|
||||
}
|
||||
}
|
||||
|
159
errors/errors.go
Normal file
159
errors/errors.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package errors
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func (e *Error) Error() string {
|
||||
return fmt.Sprintf(`{"id":"%s","code":%d,"detail":"%s","status":"%s"}`, e.Id, e.Code, e.Detail, e.Status)
|
||||
}
|
||||
|
||||
// New generates a custom error.
|
||||
func New(id, detail string, code int32) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: code,
|
||||
Detail: detail,
|
||||
// Status: http.StatusText(int(code)),
|
||||
}
|
||||
}
|
||||
|
||||
// BadRequest generates a 400 error.
|
||||
func BadRequest(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 400,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
// Status: http.StatusText(400),
|
||||
}
|
||||
}
|
||||
|
||||
// Unauthorized generates a 401 error.
|
||||
func Unauthorized(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 401,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
// Status: http.StatusText(401),
|
||||
}
|
||||
}
|
||||
|
||||
// Forbidden generates a 403 error.
|
||||
func Forbidden(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 403,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
// Status: http.StatusText(403),
|
||||
}
|
||||
}
|
||||
|
||||
// NotFound generates a 404 error.
|
||||
func NotFound(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 404,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
// Status: http.StatusText(404),
|
||||
}
|
||||
}
|
||||
|
||||
// MethodNotAllowed generates a 405 error.
|
||||
func MethodNotAllowed(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 405,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
//Status: http.StatusText(405),
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout generates a 408 error.
|
||||
func Timeout(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 408,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
//Status: http.StatusText(408),
|
||||
}
|
||||
}
|
||||
|
||||
// Conflict generates a 409 error.
|
||||
func Conflict(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 409,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
//Status: http.StatusText(409),
|
||||
}
|
||||
}
|
||||
|
||||
// InternalServerError generates a 500 error.
|
||||
func InternalServerError(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 500,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
//Status: http.StatusText(500),
|
||||
}
|
||||
}
|
||||
|
||||
// NotImplemented generates a 501 error
|
||||
func NotImplemented(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 501,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
//Status: http.StatusText(501),
|
||||
}
|
||||
}
|
||||
|
||||
// BadGateway generates a 502 error
|
||||
func BadGateway(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 502,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
// Status: http.StatusText(502),
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceUnavailable generates a 503 error
|
||||
func ServiceUnavailable(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 503,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
//Status: http.StatusText(503),
|
||||
}
|
||||
}
|
||||
|
||||
// GatewayTimeout generates a 504 error
|
||||
func GatewayTimeout(id, format string, a ...interface{}) error {
|
||||
return &Error{
|
||||
Id: id,
|
||||
Code: 504,
|
||||
Detail: fmt.Sprintf(format, a...),
|
||||
//Status: http.StatusText(504),
|
||||
}
|
||||
}
|
||||
|
||||
// Equal tries to compare errors
|
||||
func Equal(err1 error, err2 error) bool {
|
||||
verr1, ok1 := err1.(*Error)
|
||||
verr2, ok2 := err2.(*Error)
|
||||
|
||||
if ok1 != ok2 {
|
||||
return false
|
||||
}
|
||||
|
||||
if !ok1 {
|
||||
return err1 == err2
|
||||
}
|
||||
|
||||
if verr1.Code != verr2.Code {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
173
errors/errors.pb.go
Normal file
173
errors/errors.pb.go
Normal file
@@ -0,0 +1,173 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.25.0-devel
|
||||
// protoc v3.6.1
|
||||
// source: errors.proto
|
||||
|
||||
package errors
|
||||
|
||||
import (
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// Verify that this generated code is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
|
||||
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
|
||||
)
|
||||
|
||||
// This is a compile-time assertion that a sufficiently up-to-date version
|
||||
// of the legacy proto package is being used.
|
||||
const _ = proto.ProtoPackageIsVersion4
|
||||
|
||||
type Error struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
|
||||
Code int32 `protobuf:"varint,2,opt,name=code,proto3" json:"code,omitempty"`
|
||||
Detail string `protobuf:"bytes,3,opt,name=detail,proto3" json:"detail,omitempty"`
|
||||
Status string `protobuf:"bytes,4,opt,name=status,proto3" json:"status,omitempty"`
|
||||
}
|
||||
|
||||
func (x *Error) Reset() {
|
||||
*x = Error{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_errors_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *Error) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*Error) ProtoMessage() {}
|
||||
|
||||
func (x *Error) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_errors_proto_msgTypes[0]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use Error.ProtoReflect.Descriptor instead.
|
||||
func (*Error) Descriptor() ([]byte, []int) {
|
||||
return file_errors_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
func (x *Error) GetId() string {
|
||||
if x != nil {
|
||||
return x.Id
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Error) GetCode() int32 {
|
||||
if x != nil {
|
||||
return x.Code
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Error) GetDetail() string {
|
||||
if x != nil {
|
||||
return x.Detail
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Error) GetStatus() string {
|
||||
if x != nil {
|
||||
return x.Status
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
var File_errors_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_errors_proto_rawDesc = []byte{
|
||||
0x0a, 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06,
|
||||
0x65, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x22, 0x5b, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12,
|
||||
0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12,
|
||||
0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63,
|
||||
0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x18, 0x03, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x12, 0x16, 0x0a, 0x06, 0x73,
|
||||
0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61,
|
||||
0x74, 0x75, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
file_errors_proto_rawDescOnce sync.Once
|
||||
file_errors_proto_rawDescData = file_errors_proto_rawDesc
|
||||
)
|
||||
|
||||
func file_errors_proto_rawDescGZIP() []byte {
|
||||
file_errors_proto_rawDescOnce.Do(func() {
|
||||
file_errors_proto_rawDescData = protoimpl.X.CompressGZIP(file_errors_proto_rawDescData)
|
||||
})
|
||||
return file_errors_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_errors_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
|
||||
var file_errors_proto_goTypes = []interface{}{
|
||||
(*Error)(nil), // 0: errors.Error
|
||||
}
|
||||
var file_errors_proto_depIdxs = []int32{
|
||||
0, // [0:0] is the sub-list for method output_type
|
||||
0, // [0:0] is the sub-list for method input_type
|
||||
0, // [0:0] is the sub-list for extension type_name
|
||||
0, // [0:0] is the sub-list for extension extendee
|
||||
0, // [0:0] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_errors_proto_init() }
|
||||
func file_errors_proto_init() {
|
||||
if File_errors_proto != nil {
|
||||
return
|
||||
}
|
||||
if !protoimpl.UnsafeEnabled {
|
||||
file_errors_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*Error); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_errors_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 1,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
GoTypes: file_errors_proto_goTypes,
|
||||
DependencyIndexes: file_errors_proto_depIdxs,
|
||||
MessageInfos: file_errors_proto_msgTypes,
|
||||
}.Build()
|
||||
File_errors_proto = out.File
|
||||
file_errors_proto_rawDesc = nil
|
||||
file_errors_proto_goTypes = nil
|
||||
file_errors_proto_depIdxs = nil
|
||||
}
|
21
errors/errors.pb.micro.go
Normal file
21
errors/errors.pb.micro.go
Normal file
@@ -0,0 +1,21 @@
|
||||
// Code generated by protoc-gen-micro. DO NOT EDIT.
|
||||
// source: errors.proto
|
||||
|
||||
package errors
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
math "math"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
10
errors/errors.proto
Normal file
10
errors/errors.proto
Normal file
@@ -0,0 +1,10 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package errors;
|
||||
|
||||
message Error {
|
||||
string id = 1;
|
||||
int32 code = 2;
|
||||
string detail = 3;
|
||||
string status = 4;
|
||||
};
|
122
extractor.go
Normal file
122
extractor.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
)
|
||||
|
||||
func extractValue(v reflect.Type, d int) *registry.Value {
|
||||
if d == 3 {
|
||||
return nil
|
||||
}
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if v.Kind() == reflect.Ptr {
|
||||
v = v.Elem()
|
||||
}
|
||||
|
||||
arg := ®istry.Value{
|
||||
Name: v.Name(),
|
||||
Type: v.Name(),
|
||||
}
|
||||
|
||||
switch v.Kind() {
|
||||
case reflect.Struct:
|
||||
for i := 0; i < v.NumField(); i++ {
|
||||
f := v.Field(i)
|
||||
val := extractValue(f.Type, d+1)
|
||||
if val == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// if we can find a json tag use it
|
||||
if tags := f.Tag.Get("json"); len(tags) > 0 {
|
||||
parts := strings.Split(tags, ",")
|
||||
if parts[0] == "-" || parts[0] == "omitempty" {
|
||||
continue
|
||||
}
|
||||
val.Name = parts[0]
|
||||
}
|
||||
|
||||
// if there's no name default it
|
||||
if len(val.Name) == 0 {
|
||||
val.Name = v.Field(i).Name
|
||||
}
|
||||
|
||||
arg.Values = append(arg.Values, val)
|
||||
}
|
||||
case reflect.Slice:
|
||||
p := v.Elem()
|
||||
if p.Kind() == reflect.Ptr {
|
||||
p = p.Elem()
|
||||
}
|
||||
arg.Type = "[]" + p.Name()
|
||||
}
|
||||
|
||||
return arg
|
||||
}
|
||||
|
||||
func extractEndpoint(method reflect.Method) *registry.Endpoint {
|
||||
if method.PkgPath != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var rspType, reqType reflect.Type
|
||||
var stream bool
|
||||
mt := method.Type
|
||||
|
||||
switch mt.NumIn() {
|
||||
case 3:
|
||||
reqType = mt.In(1)
|
||||
rspType = mt.In(2)
|
||||
case 4:
|
||||
reqType = mt.In(2)
|
||||
rspType = mt.In(3)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
// are we dealing with a stream?
|
||||
switch rspType.Kind() {
|
||||
case reflect.Func, reflect.Interface:
|
||||
stream = true
|
||||
}
|
||||
|
||||
request := extractValue(reqType, 0)
|
||||
response := extractValue(rspType, 0)
|
||||
|
||||
ep := ®istry.Endpoint{
|
||||
Name: method.Name,
|
||||
Request: request,
|
||||
Response: response,
|
||||
Metadata: make(map[string]string),
|
||||
}
|
||||
|
||||
if stream {
|
||||
ep.Metadata = map[string]string{
|
||||
"stream": fmt.Sprintf("%v", stream),
|
||||
}
|
||||
}
|
||||
|
||||
return ep
|
||||
}
|
||||
|
||||
func extractSubValue(typ reflect.Type) *registry.Value {
|
||||
var reqType reflect.Type
|
||||
switch typ.NumIn() {
|
||||
case 1:
|
||||
reqType = typ.In(0)
|
||||
case 2:
|
||||
reqType = typ.In(1)
|
||||
case 3:
|
||||
reqType = typ.In(2)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
return extractValue(reqType, 0)
|
||||
}
|
65
extractor_test.go
Normal file
65
extractor_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
)
|
||||
|
||||
type testHandler struct{}
|
||||
|
||||
type testRequest struct{}
|
||||
|
||||
type testResponse struct{}
|
||||
|
||||
func (t *testHandler) Test(ctx context.Context, req *testRequest, rsp *testResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestExtractEndpoint(t *testing.T) {
|
||||
handler := &testHandler{}
|
||||
typ := reflect.TypeOf(handler)
|
||||
|
||||
var endpoints []*registry.Endpoint
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
if e := extractEndpoint(typ.Method(m)); e != nil {
|
||||
endpoints = append(endpoints, e)
|
||||
}
|
||||
}
|
||||
|
||||
if i := len(endpoints); i != 1 {
|
||||
t.Errorf("Expected 1 endpoint, have %d", i)
|
||||
}
|
||||
|
||||
if endpoints[0].Name != "Test" {
|
||||
t.Errorf("Expected handler Test, got %s", endpoints[0].Name)
|
||||
}
|
||||
|
||||
if endpoints[0].Request == nil {
|
||||
t.Error("Expected non nil request")
|
||||
}
|
||||
|
||||
if endpoints[0].Response == nil {
|
||||
t.Error("Expected non nil request")
|
||||
}
|
||||
|
||||
if endpoints[0].Request.Name != "testRequest" {
|
||||
t.Errorf("Expected testRequest got %s", endpoints[0].Request.Name)
|
||||
}
|
||||
|
||||
if endpoints[0].Response.Name != "testResponse" {
|
||||
t.Errorf("Expected testResponse got %s", endpoints[0].Response.Name)
|
||||
}
|
||||
|
||||
if endpoints[0].Request.Type != "testRequest" {
|
||||
t.Errorf("Expected testRequest type got %s", endpoints[0].Request.Type)
|
||||
}
|
||||
|
||||
if endpoints[0].Response.Type != "testResponse" {
|
||||
t.Errorf("Expected testResponse type got %s", endpoints[0].Response.Type)
|
||||
}
|
||||
|
||||
}
|
3
generate.go
Normal file
3
generate.go
Normal file
@@ -0,0 +1,3 @@
|
||||
package grpc
|
||||
|
||||
//go:generate protoc -I./errors -I. --go-grpc_out=paths=source_relative:./errors --go_out=paths=source_relative:./errors --micro_out=paths=source_relative:./errors errors/errors.proto
|
19
go.mod
19
go.mod
@@ -1,11 +1,16 @@
|
||||
module go.unistack.org/micro-server-grpc/v3
|
||||
module github.com/unistack-org/micro-server-grpc
|
||||
|
||||
go 1.16
|
||||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2
|
||||
go.unistack.org/micro/v3 v3.10.14
|
||||
golang.org/x/net v0.5.0
|
||||
google.golang.org/grpc v1.52.3
|
||||
google.golang.org/protobuf v1.28.1
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/google/go-cmp v0.5.1 // indirect
|
||||
github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844
|
||||
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920124807-9b11ea527aeb
|
||||
golang.org/x/net v0.0.0-20200904194848-62affa334b73
|
||||
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 // indirect
|
||||
golang.org/x/text v0.3.3 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||
google.golang.org/grpc v1.31.1
|
||||
google.golang.org/protobuf v1.25.0
|
||||
)
|
||||
|
26
handler.go
26
handler.go
@@ -3,28 +3,34 @@ package grpc
|
||||
import (
|
||||
"reflect"
|
||||
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
type rpcHandler struct {
|
||||
opts server.HandlerOptions
|
||||
handler interface{}
|
||||
name string
|
||||
endpoints []*register.Endpoint
|
||||
handler interface{}
|
||||
endpoints []*registry.Endpoint
|
||||
opts server.HandlerOptions
|
||||
}
|
||||
|
||||
func newRPCHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
options := server.NewHandlerOptions(opts...)
|
||||
func newRpcHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
options := server.HandlerOptions{
|
||||
Metadata: make(map[string]map[string]string),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
typ := reflect.TypeOf(handler)
|
||||
hdlr := reflect.ValueOf(handler)
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
|
||||
var endpoints []*register.Endpoint
|
||||
var endpoints []*registry.Endpoint
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
|
||||
if e := extractEndpoint(typ.Method(m)); e != nil {
|
||||
e.Name = name + "." + e.Name
|
||||
|
||||
for k, v := range options.Metadata[e.Name] {
|
||||
@@ -51,7 +57,7 @@ func (r *rpcHandler) Handler() interface{} {
|
||||
return r.handler
|
||||
}
|
||||
|
||||
func (r *rpcHandler) Endpoints() []*register.Endpoint {
|
||||
func (r *rpcHandler) Endpoints() []*registry.Endpoint {
|
||||
return r.endpoints
|
||||
}
|
||||
|
||||
|
47
options.go
47
options.go
@@ -2,19 +2,21 @@ package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/encoding"
|
||||
)
|
||||
|
||||
type (
|
||||
codecsKey struct{}
|
||||
grpcOptions struct{}
|
||||
maxMsgSizeKey struct{}
|
||||
reflectionKey struct{}
|
||||
unknownServiceHandlerKey struct{}
|
||||
)
|
||||
type codecsKey struct{}
|
||||
type grpcOptions struct{}
|
||||
type netListener struct{}
|
||||
type maxMsgSizeKey struct{}
|
||||
type maxConnKey struct{}
|
||||
type tlsAuth struct{}
|
||||
type reflectionKey struct{}
|
||||
|
||||
// gRPC Codec to be used to encode/decode requests for a given content type
|
||||
func Codec(contentType string, c encoding.Codec) server.Option {
|
||||
@@ -31,24 +33,35 @@ func Codec(contentType string, c encoding.Codec) server.Option {
|
||||
}
|
||||
}
|
||||
|
||||
// AuthTLS should be used to setup a secure authentication using TLS
|
||||
func AuthTLS(t *tls.Config) server.Option {
|
||||
return setServerOption(tlsAuth{}, t)
|
||||
}
|
||||
|
||||
// MaxConn specifies maximum number of max simultaneous connections to server
|
||||
func MaxConn(n int) server.Option {
|
||||
return setServerOption(maxConnKey{}, n)
|
||||
}
|
||||
|
||||
// Listener specifies the net.Listener to use instead of the default
|
||||
func Listener(l net.Listener) server.Option {
|
||||
return setServerOption(netListener{}, l)
|
||||
}
|
||||
|
||||
// Options to be used to configure gRPC options
|
||||
func Options(opts ...grpc.ServerOption) server.Option {
|
||||
return server.SetOption(grpcOptions{}, opts)
|
||||
return setServerOption(grpcOptions{}, opts)
|
||||
}
|
||||
|
||||
//
|
||||
// MaxMsgSize set the maximum message in bytes the server can receive and
|
||||
// send. Default maximum message size is 4 MB.
|
||||
// send. Default maximum message size is 4 MB.
|
||||
//
|
||||
func MaxMsgSize(s int) server.Option {
|
||||
return server.SetOption(maxMsgSizeKey{}, s)
|
||||
return setServerOption(maxMsgSizeKey{}, s)
|
||||
}
|
||||
|
||||
// Reflection enables reflection support in grpc server
|
||||
func Reflection(b bool) server.Option {
|
||||
return server.SetOption(reflectionKey{}, b)
|
||||
}
|
||||
|
||||
// UnknownServiceHandler enables support for all services
|
||||
func UnknownServiceHandler(h grpc.StreamHandler) server.Option {
|
||||
return server.SetOption(unknownServiceHandlerKey{}, h)
|
||||
return setServerOption(reflectionKey{}, b)
|
||||
}
|
||||
|
@@ -1,5 +1,3 @@
|
||||
// +build ignore
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2016 gRPC authors.
|
||||
|
52
request.go
52
request.go
@@ -1,36 +1,28 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
)
|
||||
|
||||
var (
|
||||
_ server.Request = &rpcRequest{}
|
||||
_ server.Message = &rpcMessage{}
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro-codec-bytes"
|
||||
)
|
||||
|
||||
type rpcRequest struct {
|
||||
rw io.ReadWriter
|
||||
payload interface{}
|
||||
codec codec.Codec
|
||||
header metadata.Metadata
|
||||
method string
|
||||
endpoint string
|
||||
contentType string
|
||||
service string
|
||||
method string
|
||||
contentType string
|
||||
codec codec.Codec
|
||||
header map[string]string
|
||||
body []byte
|
||||
stream bool
|
||||
payload interface{}
|
||||
}
|
||||
|
||||
type rpcMessage struct {
|
||||
payload interface{}
|
||||
codec codec.Codec
|
||||
header metadata.Metadata
|
||||
topic string
|
||||
contentType string
|
||||
payload interface{}
|
||||
header map[string]string
|
||||
body []byte
|
||||
codec codec.Codec
|
||||
}
|
||||
|
||||
func (r *rpcRequest) ContentType() string {
|
||||
@@ -46,20 +38,20 @@ func (r *rpcRequest) Method() string {
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Endpoint() string {
|
||||
return r.endpoint
|
||||
return r.method
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Codec() codec.Codec {
|
||||
func (r *rpcRequest) Codec() codec.Reader {
|
||||
return r.codec
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Header() metadata.Metadata {
|
||||
func (r *rpcRequest) Header() map[string]string {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *rpcRequest) Read() ([]byte, error) {
|
||||
f := &codec.Frame{}
|
||||
if err := r.codec.ReadBody(r.rw, f); err != nil {
|
||||
f := &bytes.Frame{}
|
||||
if err := r.codec.ReadBody(f); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return f.Data, nil
|
||||
@@ -81,14 +73,18 @@ func (r *rpcMessage) Topic() string {
|
||||
return r.topic
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Body() interface{} {
|
||||
func (r *rpcMessage) Payload() interface{} {
|
||||
return r.payload
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Header() metadata.Metadata {
|
||||
func (r *rpcMessage) Header() map[string]string {
|
||||
return r.header
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Codec() codec.Codec {
|
||||
func (r *rpcMessage) Body() []byte {
|
||||
return r.body
|
||||
}
|
||||
|
||||
func (r *rpcMessage) Codec() codec.Reader {
|
||||
return r.codec
|
||||
}
|
||||
|
17
response.go
17
response.go
@@ -1,33 +1,26 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
)
|
||||
|
||||
var _ server.Response = &rpcResponse{}
|
||||
|
||||
type rpcResponse struct {
|
||||
rw io.ReadWriter
|
||||
header metadata.Metadata
|
||||
header map[string]string
|
||||
codec codec.Codec
|
||||
}
|
||||
|
||||
func (r *rpcResponse) Codec() codec.Codec {
|
||||
func (r *rpcResponse) Codec() codec.Writer {
|
||||
return r.codec
|
||||
}
|
||||
|
||||
func (r *rpcResponse) WriteHeader(hdr metadata.Metadata) {
|
||||
func (r *rpcResponse) WriteHeader(hdr map[string]string) {
|
||||
for k, v := range hdr {
|
||||
r.header[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rpcResponse) Write(b []byte) error {
|
||||
return r.codec.Write(r.rw, &codec.Message{
|
||||
return r.codec.Write(&codec.Message{
|
||||
Header: r.header,
|
||||
Body: b,
|
||||
}, nil)
|
||||
|
92
server.go
92
server.go
@@ -14,35 +14,38 @@ import (
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
// Precompute the reflect type for error. Can't use error directly
|
||||
// because Typeof takes an empty interface value. This is annoying.
|
||||
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||
var (
|
||||
// Precompute the reflect type for error. Can't use error directly
|
||||
// because Typeof takes an empty interface value. This is annoying.
|
||||
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
|
||||
)
|
||||
|
||||
type methodType struct {
|
||||
method reflect.Method
|
||||
ArgType reflect.Type
|
||||
ReplyType reflect.Type
|
||||
ContextType reflect.Type
|
||||
method reflect.Method
|
||||
stream bool
|
||||
}
|
||||
|
||||
// type reflectionType func(context.Context, server.Stream) error
|
||||
type reflectionType func(context.Context, server.Stream) error
|
||||
|
||||
type service struct {
|
||||
typ reflect.Type
|
||||
method map[string]*methodType
|
||||
rcvr reflect.Value
|
||||
name string
|
||||
name string // name of service
|
||||
rcvr reflect.Value // receiver of methods for the service
|
||||
typ reflect.Type // type of the receiver
|
||||
method map[string]*methodType // registered methods
|
||||
}
|
||||
|
||||
// server represents an RPC Server.
|
||||
type rServer struct {
|
||||
mu sync.RWMutex // protects the serviceMap
|
||||
serviceMap map[string]*service
|
||||
mu sync.RWMutex
|
||||
// reflection bool
|
||||
reflection bool
|
||||
}
|
||||
|
||||
// Is this an exported - upper case - name?
|
||||
@@ -63,7 +66,7 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
|
||||
|
||||
// prepareEndpoint() returns a methodType for the provided method or nil
|
||||
// in case if the method was unsuitable.
|
||||
func prepareEndpoint(method reflect.Method) (*methodType, error) {
|
||||
func prepareEndpoint(method reflect.Method) *methodType {
|
||||
mtype := method.Type
|
||||
mname := method.Name
|
||||
var replyType, argType, contextType reflect.Type
|
||||
@@ -71,7 +74,7 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) {
|
||||
|
||||
// Endpoint() must be exported.
|
||||
if method.PkgPath != "" {
|
||||
return nil, fmt.Errorf("Endpoint must be exported")
|
||||
return nil
|
||||
}
|
||||
|
||||
switch mtype.NumIn() {
|
||||
@@ -86,41 +89,63 @@ func prepareEndpoint(method reflect.Method) (*methodType, error) {
|
||||
replyType = mtype.In(3)
|
||||
contextType = mtype.In(1)
|
||||
default:
|
||||
return nil, fmt.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("method %v of %v has wrong number of ins: %v", mname, mtype, mtype.NumIn())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
switch stream {
|
||||
case true:
|
||||
if stream {
|
||||
// check stream type
|
||||
streamType := reflect.TypeOf((*server.Stream)(nil)).Elem()
|
||||
if !argType.Implements(streamType) {
|
||||
return nil, fmt.Errorf("%v argument does not implement Streamer interface: %v", mname, argType)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("%v argument does not implement Streamer interface: %v", mname, argType)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
} else {
|
||||
// if not stream check the replyType
|
||||
|
||||
// First arg need not be a pointer.
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return nil, fmt.Errorf("%v argument type not exported: %v", mname, argType)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("%v argument type not exported: %v", mname, argType)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if replyType.Kind() != reflect.Ptr {
|
||||
return nil, fmt.Errorf("method %v reply type not a pointer: %v", mname, replyType)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("method %v reply type not a pointer: %v", mname, replyType)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reply type must be exported.
|
||||
if !isExportedOrBuiltinType(replyType) {
|
||||
return nil, fmt.Errorf("method %v reply type not exported: %v", mname, replyType)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("method %v reply type not exported: %v", mname, replyType)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Endpoint() needs one out.
|
||||
if mtype.NumOut() != 1 {
|
||||
return nil, fmt.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("method %v has wrong number of outs: %v", mname, mtype.NumOut())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// The return type of the method must be error.
|
||||
if returnType := mtype.Out(0); returnType != typeOfError {
|
||||
return nil, fmt.Errorf("method %v returns %v not error", mname, returnType.String())
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("method %v returns %v not error", mname, returnType.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}, nil
|
||||
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
|
||||
}
|
||||
|
||||
func (server *rServer) register(rcvr interface{}) error {
|
||||
@@ -137,7 +162,11 @@ func (server *rServer) register(rcvr interface{}) error {
|
||||
return fmt.Errorf("rpc: no service name for type %v", s.typ.String())
|
||||
}
|
||||
if !isExported(sname) {
|
||||
return fmt.Errorf("rpc Register: type %s is not exported", sname)
|
||||
s := "rpc Register: type " + sname + " is not exported"
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(s)
|
||||
}
|
||||
return fmt.Errorf(s)
|
||||
}
|
||||
if _, present := server.serviceMap[sname]; present {
|
||||
return fmt.Errorf("rpc: service already defined: " + sname)
|
||||
@@ -148,16 +177,17 @@ func (server *rServer) register(rcvr interface{}) error {
|
||||
// Install the methods
|
||||
for m := 0; m < s.typ.NumMethod(); m++ {
|
||||
method := s.typ.Method(m)
|
||||
mt, err := prepareEndpoint(method)
|
||||
if mt != nil && err == nil {
|
||||
if mt := prepareEndpoint(method); mt != nil {
|
||||
s.method[method.Name] = mt
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(s.method) == 0 {
|
||||
return fmt.Errorf("rpc Register: type %s has no exported methods of suitable type", sname)
|
||||
s := "rpc Register: type " + sname + " has no exported methods of suitable type"
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(s)
|
||||
}
|
||||
return fmt.Errorf(s)
|
||||
}
|
||||
server.serviceMap[s.name] = s
|
||||
return nil
|
||||
|
@@ -3,7 +3,7 @@ package grpc
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
119
subscriber.go
119
subscriber.go
@@ -7,18 +7,22 @@ import (
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
const (
|
||||
subSig = "func(context.Context, interface{}) error"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
method reflect.Value
|
||||
reqType reflect.Type
|
||||
ctxType reflect.Type
|
||||
method reflect.Value
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
@@ -27,14 +31,17 @@ type subscriber struct {
|
||||
typ reflect.Type
|
||||
subscriber interface{}
|
||||
handlers []*handler
|
||||
endpoints []*register.Endpoint
|
||||
endpoints []*registry.Endpoint
|
||||
opts server.SubscriberOptions
|
||||
}
|
||||
|
||||
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
options := server.NewSubscriberOptions(opts...)
|
||||
options := server.NewSubscriberOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
var endpoints []*register.Endpoint
|
||||
var endpoints []*registry.Endpoint
|
||||
var handlers []*handler
|
||||
|
||||
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
|
||||
@@ -52,9 +59,9 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
||||
|
||||
handlers = append(handlers, h)
|
||||
|
||||
endpoints = append(endpoints, ®ister.Endpoint{
|
||||
endpoints = append(endpoints, ®istry.Endpoint{
|
||||
Name: "Func",
|
||||
Request: register.ExtractSubValue(typ),
|
||||
Request: extractSubValue(typ),
|
||||
Metadata: map[string]string{
|
||||
"topic": topic,
|
||||
"subscriber": "true",
|
||||
@@ -80,9 +87,9 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
||||
|
||||
handlers = append(handlers, h)
|
||||
|
||||
endpoints = append(endpoints, ®ister.Endpoint{
|
||||
endpoints = append(endpoints, ®istry.Endpoint{
|
||||
Name: name + "." + method.Name,
|
||||
Request: register.ExtractSubValue(method.Type),
|
||||
Request: extractSubValue(method.Type),
|
||||
Metadata: map[string]string{
|
||||
"topic": topic,
|
||||
"subscriber": "true",
|
||||
@@ -102,13 +109,67 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
func validateSubscriber(sub server.Subscriber) error {
|
||||
typ := reflect.TypeOf(sub.Subscriber())
|
||||
var argType reflect.Type
|
||||
|
||||
if typ.Kind() == reflect.Func {
|
||||
name := "Func"
|
||||
switch typ.NumIn() {
|
||||
case 2:
|
||||
argType = typ.In(1)
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
|
||||
}
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
|
||||
}
|
||||
if typ.NumOut() != 1 {
|
||||
return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s",
|
||||
name, typ.NumOut(), subSig)
|
||||
}
|
||||
if returnType := typ.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
|
||||
}
|
||||
} else {
|
||||
hdlr := reflect.ValueOf(sub.Subscriber())
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
method := typ.Method(m)
|
||||
|
||||
switch method.Type.NumIn() {
|
||||
case 3:
|
||||
argType = method.Type.In(2)
|
||||
default:
|
||||
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
|
||||
name, method.Name, method.Type.NumIn(), subSig)
|
||||
}
|
||||
|
||||
if !isExportedOrBuiltinType(argType) {
|
||||
return fmt.Errorf("%v argument type not exported: %v", name, argType)
|
||||
}
|
||||
if method.Type.NumOut() != 1 {
|
||||
return fmt.Errorf(
|
||||
"subscriber %v.%v has wrong number of outs: %v require signature %s",
|
||||
name, method.Name, method.Type.NumOut(), subSig)
|
||||
}
|
||||
if returnType := method.Type.Out(0); returnType != typeOfError {
|
||||
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if g.opts.Logger.V(logger.ErrorLevel) {
|
||||
g.opts.Logger.Error(g.opts.Context, "panic recovered: ", r)
|
||||
g.opts.Logger.Error(g.opts.Context, string(debug.Stack()))
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error("panic recovered: ", r)
|
||||
logger.Error(string(debug.Stack()))
|
||||
}
|
||||
err = errors.InternalServerError(g.opts.Name+".subscriber", "panic recovered: %v", r)
|
||||
}
|
||||
@@ -122,23 +183,20 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha
|
||||
|
||||
ct := msg.Header["Content-Type"]
|
||||
if len(ct) == 0 {
|
||||
msg.Header["Content-Type"] = DefaultContentType
|
||||
ct = DefaultContentType
|
||||
msg.Header["Content-Type"] = defaultContentType
|
||||
ct = defaultContentType
|
||||
}
|
||||
cf, err := g.newCodec(ct)
|
||||
cf, err := g.newGRPCCodec(ct)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hdr := make(map[string]string, len(msg.Header))
|
||||
for k, v := range msg.Header {
|
||||
if k == "Content-Type" {
|
||||
continue
|
||||
}
|
||||
hdr[k] = v
|
||||
}
|
||||
|
||||
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
|
||||
delete(hdr, "Content-Type")
|
||||
ctx := metadata.NewContext(sb.opts.Context, hdr)
|
||||
|
||||
results := make(chan error, len(sb.handlers))
|
||||
|
||||
@@ -171,7 +229,7 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha
|
||||
vals = append(vals, reflect.ValueOf(ctx))
|
||||
}
|
||||
|
||||
vals = append(vals, reflect.ValueOf(msg.Body()))
|
||||
vals = append(vals, reflect.ValueOf(msg.Payload()))
|
||||
|
||||
returnValues := handler.method.Call(vals)
|
||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||
@@ -191,13 +249,14 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha
|
||||
if g.wg != nil {
|
||||
defer g.wg.Done()
|
||||
}
|
||||
cerr := fn(ctx, &rpcMessage{
|
||||
err := fn(ctx, &rpcMessage{
|
||||
topic: sb.topic,
|
||||
contentType: ct,
|
||||
payload: req.Interface(),
|
||||
header: msg.Header,
|
||||
body: msg.Body,
|
||||
})
|
||||
results <- cerr
|
||||
results <- err
|
||||
}()
|
||||
}
|
||||
var errors []string
|
||||
@@ -222,7 +281,7 @@ func (s *subscriber) Subscriber() interface{} {
|
||||
return s.subscriber
|
||||
}
|
||||
|
||||
func (s *subscriber) Endpoints() []*register.Endpoint {
|
||||
func (s *subscriber) Endpoints() []*registry.Endpoint {
|
||||
return s.endpoints
|
||||
}
|
||||
|
||||
|
59
util.go
59
util.go
@@ -1,59 +0,0 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ServiceMethod converts a gRPC method to a Go method
|
||||
// Input:
|
||||
// Foo.Bar, /Foo/Bar, /package.Foo/Bar, /a.package.Foo/Bar
|
||||
// Output:
|
||||
// [Foo, Bar]
|
||||
func serviceMethod(m string) (string, string, error) {
|
||||
if len(m) == 0 {
|
||||
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||
}
|
||||
|
||||
// grpc method
|
||||
if m[0] == '/' {
|
||||
// [ , Foo, Bar]
|
||||
// [ , package.Foo, Bar]
|
||||
// [ , a.package.Foo, Bar]
|
||||
parts := strings.Split(m, "/")
|
||||
if len(parts) != 3 || len(parts[1]) == 0 || len(parts[2]) == 0 {
|
||||
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||
}
|
||||
service := strings.Split(parts[1], ".")
|
||||
return service[len(service)-1], parts[2], nil
|
||||
}
|
||||
|
||||
// non grpc method
|
||||
parts := strings.Split(m, ".")
|
||||
|
||||
// expect [Foo, Bar]
|
||||
if len(parts) != 2 {
|
||||
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||
}
|
||||
|
||||
return parts[0], parts[1], nil
|
||||
}
|
||||
|
||||
/*
|
||||
// ServiceFromMethod returns the service
|
||||
// /service.Foo/Bar => service
|
||||
func serviceFromMethod(m string) string {
|
||||
if len(m) == 0 {
|
||||
return m
|
||||
}
|
||||
if m[0] != '/' {
|
||||
return m
|
||||
}
|
||||
parts := strings.Split(m, "/")
|
||||
if len(parts) < 3 {
|
||||
return m
|
||||
}
|
||||
parts = strings.Split(parts[1], ".")
|
||||
return strings.Join(parts[:len(parts)-1], ".")
|
||||
}
|
||||
*/
|
46
util_test.go
46
util_test.go
@@ -1,46 +0,0 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestServiceMethod(t *testing.T) {
|
||||
type testCase struct {
|
||||
input string
|
||||
service string
|
||||
method string
|
||||
err bool
|
||||
}
|
||||
|
||||
methods := []testCase{
|
||||
{"Foo.Bar", "Foo", "Bar", false},
|
||||
{"/Foo/Bar", "Foo", "Bar", false},
|
||||
{"/package.Foo/Bar", "Foo", "Bar", false},
|
||||
{"/a.package.Foo/Bar", "Foo", "Bar", false},
|
||||
{"a.package.Foo/Bar", "", "", true},
|
||||
{"/Foo/Bar/Baz", "", "", true},
|
||||
{"Foo.Bar.Baz", "", "", true},
|
||||
}
|
||||
for _, test := range methods {
|
||||
service, method, err := serviceMethod(test.input)
|
||||
if err != nil && test.err == true {
|
||||
continue
|
||||
}
|
||||
// unexpected error
|
||||
if err != nil && test.err == false {
|
||||
t.Fatalf("unexpected err %v for %+v", err, test)
|
||||
}
|
||||
// expecter error
|
||||
if test.err == true && err == nil {
|
||||
t.Fatalf("expected error for %+v: got service: %s method: %s", test, service, method)
|
||||
}
|
||||
|
||||
if service != test.service {
|
||||
t.Fatalf("wrong service for %+v: got service: %s method: %s", test, service, method)
|
||||
}
|
||||
|
||||
if method != test.method {
|
||||
t.Fatalf("wrong method for %+v: got service: %s method: %s", test, service, method)
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user