Compare commits

..

2 Commits

Author SHA1 Message Date
f33595f72a dont log error in server
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-19 02:11:29 +03:00
f94c265c7a optimize unknown handler
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-02-11 22:17:55 +03:00

85
grpc.go
View File

@@ -45,14 +45,15 @@ type ServerReflection struct {
*/
type Server struct {
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *sync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *sync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
unknownHandler grpc.StreamHandler
sync.RWMutex
started bool
registered bool
@@ -157,6 +158,10 @@ func (g *Server) configure(opts ...server.Option) error {
g.reflection = v
}
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
g.unknownHandler = h
}
if restart {
return g.Start()
}
@@ -209,13 +214,6 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
config.Logger.Error(config.Context, string(debug.Stack()))
}
err = errors.InternalServerError(g.opts.Name, "panic in %s.%s recovered: %v", serviceName, methodName, r)
} else if err != nil {
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "grpc handler %s.%s got error: %s", serviceName, methodName, err)
}
}
}()
@@ -319,20 +317,16 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
*/
if svc == nil {
if g.opts.Context != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
return h(srv, stream)
}
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
}
mtype := svc.method[methodName]
if mtype == nil {
if g.opts.Context != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
return h(srv, stream)
}
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
}
@@ -447,53 +441,8 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
}
return status.New(statusCode, statusDesc).Err()
// }
}
/*
type reflectStream struct {
stream server.Stream
}
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
return s.stream.Send(rsp)
}
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
req := &grpcreflect.ServerReflectionRequest{}
err := s.stream.Recv(req)
return req, err
}
func (s *reflectStream) SetHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SendHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SetTrailer(gmetadata.MD) {
}
func (s *reflectStream) Context() context.Context {
return s.stream.Context()
}
func (s *reflectStream) SendMsg(m interface{}) error {
return s.stream.Send(m)
}
func (s *reflectStream) RecvMsg(m interface{}) error {
return s.stream.Recv(m)
}
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
return g.s.ServerReflectionInfo(&reflectStream{stream})
}
*/
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
opts := g.opts