Compare commits
51 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
f8d3695962 | ||
ae158ce5fc | |||
8125c9003c | |||
a6f6df257b | |||
6b19cb2fb7 | |||
|
db6fee9760 | ||
309f100532 | |||
|
22ae55f739 | ||
70700a3f86 | |||
7dd327086c | |||
a67efa39ae | |||
|
8ee91422cc | ||
f8ae500c5f | |||
|
7fcc042fbf | ||
3a22f3a900 | |||
|
452a124aee | ||
|
26d3adfe95 | ||
|
18d6584c8f | ||
f26dde5d63 | |||
|
66d3feb263 | ||
5c8effa23f | |||
|
c1e318d0b3 | ||
617764706c | |||
0f3e56f697 | |||
|
b87462c465 | ||
|
e877a92718 | ||
|
c60f0ccb26 | ||
7cf4a8d293 | |||
84b1b862a7 | |||
|
eb17921feb | ||
|
ddeb0a23c3 | ||
|
830d8d8fda | ||
|
ceaff6bf88 | ||
|
01848b8ec7 | ||
|
8ddfa39811 | ||
110a8a8a9c | |||
|
f2587f0876 | ||
eccdad9752 | |||
|
c05996ee6e | ||
734d6fa7af | |||
4c0ca3664a | |||
|
d34ce4f314 | ||
|
d9932033ee | ||
847887de84 | |||
|
10ea1928f4 | ||
|
52d37c6579 | ||
|
0af18ab84b | ||
|
c46d11a2d4 | ||
|
117f48aac5 | ||
b4a2fbdeeb | |||
e36db68d4d |
2
.github/workflows/autoapprove.yml
vendored
2
.github/workflows/autoapprove.yml
vendored
@@ -13,7 +13,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: approve
|
||||
uses: hmarr/auto-approve-action@v2
|
||||
uses: hmarr/auto-approve-action@v3
|
||||
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
|
||||
id: approve
|
||||
with:
|
||||
|
4
.github/workflows/build.yml
vendored
4
.github/workflows/build.yml
vendored
@@ -10,7 +10,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: setup
|
||||
uses: actions/setup-go@v2
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.17
|
||||
- name: checkout
|
||||
@@ -34,7 +34,7 @@ jobs:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.1.0
|
||||
uses: golangci/golangci-lint-action@v3.3.1
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
|
8
.github/workflows/codeql-analysis.yml
vendored
8
.github/workflows/codeql-analysis.yml
vendored
@@ -45,12 +45,12 @@ jobs:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: setup
|
||||
uses: actions/setup-go@v2
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.17
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: init
|
||||
uses: github/codeql-action/init@v1
|
||||
uses: github/codeql-action/init@v2
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
@@ -61,7 +61,7 @@ jobs:
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: autobuild
|
||||
uses: github/codeql-action/autobuild@v1
|
||||
uses: github/codeql-action/autobuild@v2
|
||||
|
||||
# ℹ️ Command-line programs to run using the OS shell.
|
||||
# 📚 https://git.io/JvXDl
|
||||
@@ -75,4 +75,4 @@ jobs:
|
||||
# make release
|
||||
|
||||
- name: analyze
|
||||
uses: github/codeql-action/analyze@v1
|
||||
uses: github/codeql-action/analyze@v2
|
||||
|
2
.github/workflows/dependabot-automerge.yml
vendored
2
.github/workflows/dependabot-automerge.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
||||
steps:
|
||||
- name: metadata
|
||||
id: metadata
|
||||
uses: dependabot/fetch-metadata@v1.3.0
|
||||
uses: dependabot/fetch-metadata@v1.3.5
|
||||
with:
|
||||
github-token: "${{ secrets.TOKEN }}"
|
||||
- name: merge
|
||||
|
4
.github/workflows/pr.yml
vendored
4
.github/workflows/pr.yml
vendored
@@ -10,7 +10,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: setup
|
||||
uses: actions/setup-go@v2
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.17
|
||||
- name: checkout
|
||||
@@ -34,7 +34,7 @@ jobs:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.1.0
|
||||
uses: golangci/golangci-lint-action@v3.3.1
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
|
14
go.mod
14
go.mod
@@ -4,14 +4,8 @@ go 1.16
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/google/gnostic v0.6.8 // indirect
|
||||
github.com/google/go-cmp v0.5.7 // indirect
|
||||
github.com/kr/pretty v0.2.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
go.unistack.org/micro/v3 v3.9.7
|
||||
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
|
||||
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 // indirect
|
||||
google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6 // indirect
|
||||
google.golang.org/grpc v1.45.0
|
||||
google.golang.org/protobuf v1.28.0
|
||||
go.unistack.org/micro/v3 v3.10.1
|
||||
golang.org/x/net v0.4.0
|
||||
google.golang.org/grpc v1.52.0
|
||||
google.golang.org/protobuf v1.28.1
|
||||
)
|
||||
|
81
grpc.go
81
grpc.go
@@ -35,7 +35,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultContentType = "application/grpc+proto"
|
||||
DefaultContentType = "application/grpc"
|
||||
)
|
||||
|
||||
/*
|
||||
@@ -108,9 +108,6 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
if err := g.opts.Tracer.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.opts.Auth.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := g.opts.Logger.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -200,31 +197,6 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
||||
}
|
||||
|
||||
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
g.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Error(config.Context, "panic recovered: ", r)
|
||||
config.Logger.Error(config.Context, string(debug.Stack()))
|
||||
}
|
||||
err = errors.InternalServerError(g.opts.Name, "panic recovered: %v", r)
|
||||
} else if err != nil {
|
||||
g.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(config.Context, "grpc handler got error: %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if g.wg != nil {
|
||||
g.wg.Add(1)
|
||||
defer g.wg.Done()
|
||||
}
|
||||
|
||||
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||
if !ok {
|
||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||
@@ -235,6 +207,31 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
||||
return status.New(codes.InvalidArgument, err.Error()).Err()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
g.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(config.Context, "panic in %s.%s recovered: %v", serviceName, methodName, r)
|
||||
config.Logger.Error(config.Context, string(debug.Stack()))
|
||||
}
|
||||
err = errors.InternalServerError(g.opts.Name, "panic in %s.%s recovered: %v", serviceName, methodName, r)
|
||||
} else if err != nil {
|
||||
g.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(config.Context, "grpc handler %s.%s got error: %s", serviceName, methodName, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if g.wg != nil {
|
||||
g.wg.Add(1)
|
||||
defer g.wg.Done()
|
||||
}
|
||||
|
||||
// get grpc metadata
|
||||
gmd, ok := gmetadata.FromIncomingContext(stream.Context())
|
||||
if !ok {
|
||||
@@ -286,7 +283,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
||||
|
||||
// get peer from context
|
||||
if p, ok := peer.FromContext(stream.Context()); ok {
|
||||
md["Remote"] = p.Addr.String()
|
||||
md.Set("Remote", p.Addr.String())
|
||||
ctx = peer.NewContext(ctx, p)
|
||||
}
|
||||
|
||||
@@ -387,11 +384,12 @@ func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStrea
|
||||
service: g.opts.Name,
|
||||
contentType: ct,
|
||||
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||
endpoint: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||
payload: argv.Interface(),
|
||||
}
|
||||
// define the handler func
|
||||
fn := func(ctx context.Context, req server.Request, rsp interface{}) (err error) {
|
||||
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), reflect.ValueOf(argv.Interface()), reflect.ValueOf(rsp)})
|
||||
returnValues = function.Call([]reflect.Value{service.rcvr, mtype.prepareContext(ctx), argv, reflect.ValueOf(rsp)})
|
||||
|
||||
// The return value for the method is an error.
|
||||
if rerr := returnValues[0].Interface(); rerr != nil {
|
||||
@@ -409,7 +407,13 @@ func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStrea
|
||||
statusCode := codes.OK
|
||||
statusDesc := ""
|
||||
// execute the handler
|
||||
if appErr := fn(ctx, r, replyv.Interface()); appErr != nil {
|
||||
appErr := fn(ctx, r, replyv.Interface())
|
||||
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
if err = stream.SendHeader(gmetadata.New(outmd)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if appErr != nil {
|
||||
var errStatus *status.Status
|
||||
switch verr := appErr.(type) {
|
||||
case *errors.Error:
|
||||
@@ -501,6 +505,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
|
||||
service: opts.Name,
|
||||
contentType: ct,
|
||||
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||
endpoint: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||
stream: true,
|
||||
}
|
||||
|
||||
@@ -529,7 +534,13 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
|
||||
statusCode := codes.OK
|
||||
statusDesc := ""
|
||||
|
||||
if appErr := fn(ctx, r, ss); appErr != nil {
|
||||
appErr := fn(ctx, r, ss)
|
||||
if outmd, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
if err := stream.SendHeader(gmetadata.New(outmd)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if appErr != nil {
|
||||
var err error
|
||||
var errStatus *status.Status
|
||||
switch verr := appErr.(type) {
|
||||
@@ -585,9 +596,6 @@ func (g *grpcServer) Options() server.Options {
|
||||
}
|
||||
|
||||
func (g *grpcServer) Init(opts ...server.Option) error {
|
||||
if len(opts) == 0 {
|
||||
return nil
|
||||
}
|
||||
return g.configure(opts...)
|
||||
}
|
||||
|
||||
@@ -719,6 +727,7 @@ func (g *grpcServer) Register() error {
|
||||
}
|
||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||
|
Reference in New Issue
Block a user