optimize unknown handler
Some checks failed
build / test (push) Has been cancelled
build / lint (push) Has been cancelled
codeql / analyze (go) (push) Has been cancelled

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-02-11 22:17:55 +03:00
parent 3036359547
commit f94c265c7a

78
grpc.go
View File

@ -45,14 +45,15 @@ type ServerReflection struct {
*/ */
type Server struct { type Server struct {
handlers map[string]server.Handler handlers map[string]server.Handler
srv *grpc.Server srv *grpc.Server
exit chan chan error exit chan chan error
wg *sync.WaitGroup wg *sync.WaitGroup
rsvc *register.Service rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
rpc *rServer rpc *rServer
opts server.Options opts server.Options
unknownHandler grpc.StreamHandler
sync.RWMutex sync.RWMutex
started bool started bool
registered bool registered bool
@ -157,6 +158,10 @@ func (g *Server) configure(opts ...server.Option) error {
g.reflection = v g.reflection = v
} }
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
g.unknownHandler = h
}
if restart { if restart {
return g.Start() return g.Start()
} }
@ -319,20 +324,16 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
*/ */
if svc == nil { if svc == nil {
if g.opts.Context != nil { if g.unknownHandler != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok { return g.unknownHandler(srv, stream)
return h(srv, stream)
}
} }
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
} }
mtype := svc.method[methodName] mtype := svc.method[methodName]
if mtype == nil { if mtype == nil {
if g.opts.Context != nil { if g.unknownHandler != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok { return g.unknownHandler(srv, stream)
return h(srv, stream)
}
} }
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err() return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
} }
@ -447,53 +448,8 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
} }
return status.New(statusCode, statusDesc).Err() return status.New(statusCode, statusDesc).Err()
// }
} }
/*
type reflectStream struct {
stream server.Stream
}
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
return s.stream.Send(rsp)
}
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
req := &grpcreflect.ServerReflectionRequest{}
err := s.stream.Recv(req)
return req, err
}
func (s *reflectStream) SetHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SendHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SetTrailer(gmetadata.MD) {
}
func (s *reflectStream) Context() context.Context {
return s.stream.Context()
}
func (s *reflectStream) SendMsg(m interface{}) error {
return s.stream.Send(m)
}
func (s *reflectStream) RecvMsg(m interface{}) error {
return s.stream.Recv(m)
}
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
return g.s.ServerReflectionInfo(&reflectStream{stream})
}
*/
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
opts := g.opts opts := g.opts