Compare commits

..

7 Commits

Author SHA1 Message Date
8c42fbb18b update for latest micro
Some checks failed
build / test (push) Failing after 1m20s
build / lint (push) Successful in 9m16s
codeql / analyze (go) (push) Failing after 2m15s
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-03-17 00:32:52 +03:00
f860254f7b Merge pull request 'use time option' (#174) from devstigneev/micro-server-grpc:issue_171 into v3
Some checks failed
build / test (push) Failing after 1m30s
build / lint (push) Successful in 9m19s
codeql / analyze (go) (push) Failing after 2m0s
Reviewed-on: #174
2024-03-07 23:22:25 +03:00
0c060a5868 use time option
Some checks failed
automerge / automerge (pull_request) Has been skipped
dependabot-automerge / automerge (pull_request) Has been skipped
autoapprove / autoapprove (pull_request) Successful in 10s
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
2024-03-07 21:52:54 +03:00
c24f1f26f8 Merge pull request '#131 delete recover' (#172) from kgorbunov/micro-server-grpc:#131 into v3
Some checks failed
build / test (push) Failing after 1m47s
codeql / analyze (go) (push) Failing after 6m1s
build / lint (push) Failing after 18m28s
Reviewed-on: #172
2024-02-27 20:28:59 +03:00
Gorbunov Kirill Andreevich
566036802b #131 delete recover
Some checks failed
autoapprove / autoapprove (pull_request) Successful in 8s
automerge / automerge (pull_request) Has been skipped
dependabot-automerge / automerge (pull_request) Has been skipped
codeql / analyze (go) (pull_request) Has been cancelled
prbuild / test (pull_request) Has been cancelled
prbuild / lint (pull_request) Has been cancelled
2024-02-27 17:08:08 +03:00
f33595f72a dont log error in server
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-19 02:11:29 +03:00
f94c265c7a optimize unknown handler
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-11 22:17:55 +03:00
5 changed files with 76 additions and 116 deletions

24
.gitignore vendored Normal file
View File

@@ -0,0 +1,24 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
bin
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
# Go workspace file
go.work
# General
.DS_Store
.idea
.vscode

16
go.mod
View File

@@ -3,15 +3,15 @@ module go.unistack.org/micro-server-grpc/v3
go 1.20
require (
github.com/golang/protobuf v1.5.3
go.unistack.org/micro/v3 v3.10.31
golang.org/x/net v0.17.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
github.com/golang/protobuf v1.5.4
go.unistack.org/micro/v3 v3.10.52
golang.org/x/net v0.22.0
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
)
require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect
)

22
go.sum
View File

@@ -1,24 +1,38 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
go.unistack.org/micro/v3 v3.10.31 h1:H5mowuB5i3AjG4GOj//V9oTldFZqfKtd5gJD+TeZYCM=
go.unistack.org/micro/v3 v3.10.31/go.mod h1:eUgtvbtiiz6te93m0ZdmoecbitWwjdBmmr84srmEIKA=
go.unistack.org/micro/v3 v3.10.42 h1:A0nA6WT6wNq5fyQyzliX70Bj5/SGj5kadLSOySX4hro=
go.unistack.org/micro/v3 v3.10.42/go.mod h1:CSmEf5ddmft94MyKHnUSMM0W5dpmmTVbgImbgQWV5Ak=
go.unistack.org/micro/v3 v3.10.52 h1:6LlAvLLlf+3JLCEQEVNQWi7DXCoI1ocuOqqoEPj5S+k=
go.unistack.org/micro/v3 v3.10.52/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c h1:lfpJ/2rWPa/kJgxyyXM8PrNnfCzcmxJ265mADgwmvLI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=

104
grpc.go
View File

@@ -1,5 +1,5 @@
// Package grpc provides a grpc server
package grpc // import "go.unistack.org/micro-server-grpc/v3"
package grpc
import (
"context"
@@ -7,7 +7,6 @@ import (
"fmt"
"net"
"reflect"
"runtime/debug"
"sort"
"strconv"
"strings"
@@ -20,9 +19,10 @@ import (
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
metadata "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
msync "go.unistack.org/micro/v3/sync"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -48,11 +48,12 @@ type Server struct {
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *sync.WaitGroup
wg *msync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
unknownHandler grpc.StreamHandler
sync.RWMutex
started bool
registered bool
@@ -117,8 +118,6 @@ func (g *Server) configure(opts ...server.Option) error {
return err
}
g.wg = g.opts.Wait
if g.opts.Context != nil {
if codecs, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs {
@@ -157,6 +156,10 @@ func (g *Server) configure(opts ...server.Option) error {
g.reflection = v
}
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
g.unknownHandler = h
}
if restart {
return g.Start()
}
@@ -199,29 +202,9 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
return status.New(codes.InvalidArgument, err.Error()).Err()
}
defer func() {
if r := recover(); r != nil {
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "panic in %s.%s recovered: %v", serviceName, methodName, r)
config.Logger.Error(config.Context, string(debug.Stack()))
}
err = errors.InternalServerError(g.opts.Name, "panic in %s.%s recovered: %v", serviceName, methodName, r)
} else if err != nil {
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "grpc handler %s.%s got error: %s", serviceName, methodName, err)
}
}
}()
if g.wg != nil {
g.wg.Add(1)
defer g.wg.Done()
if g.opts.Wait != nil {
g.opts.Wait.Add(1)
defer g.opts.Wait.Done()
}
// get grpc metadata
@@ -319,20 +302,16 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
*/
if svc == nil {
if g.opts.Context != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
return h(srv, stream)
}
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
}
mtype := svc.method[methodName]
if mtype == nil {
if g.opts.Context != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
return h(srv, stream)
}
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
}
@@ -447,53 +426,8 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
}
return status.New(statusCode, statusDesc).Err()
// }
}
/*
type reflectStream struct {
stream server.Stream
}
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
return s.stream.Send(rsp)
}
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
req := &grpcreflect.ServerReflectionRequest{}
err := s.stream.Recv(req)
return req, err
}
func (s *reflectStream) SetHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SendHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SetTrailer(gmetadata.MD) {
}
func (s *reflectStream) Context() context.Context {
return s.stream.Context()
}
func (s *reflectStream) SendMsg(m interface{}) error {
return s.stream.Send(m)
}
func (s *reflectStream) RecvMsg(m interface{}) error {
return s.stream.Recv(m)
}
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
return g.s.ServerReflectionInfo(&reflectStream{stream})
}
*/
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
opts := g.opts
@@ -918,8 +852,8 @@ func (g *Server) Start() error {
}
// wait for waitgroup
if g.wg != nil {
g.wg.Wait()
if g.opts.Wait != nil {
g.opts.Wait.Wait()
}
// stop the grpc server
@@ -932,7 +866,7 @@ func (g *Server) Start() error {
select {
case <-exit:
case <-time.After(time.Second):
case <-time.After(g.opts.GracefulTimeout):
g.srv.Stop()
}

View File

@@ -4,12 +4,9 @@ import (
"context"
"fmt"
"reflect"
"runtime/debug"
"strings"
"go.unistack.org/micro/v3/broker"
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/server"
@@ -104,15 +101,6 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
if g.opts.Logger.V(logger.ErrorLevel) {
g.opts.Logger.Error(g.opts.Context, "panic recovered: ", r)
g.opts.Logger.Error(g.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(g.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msg := p.Message()
// if we don't have headers, create empty map