optimize unknown handler
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
4003d714ba
commit
9907ff90f5
78
grpc.go
78
grpc.go
@ -45,14 +45,15 @@ type ServerReflection struct {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
handlers map[string]server.Handler
|
handlers map[string]server.Handler
|
||||||
srv *grpc.Server
|
srv *grpc.Server
|
||||||
exit chan chan error
|
exit chan chan error
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
rsvc *register.Service
|
rsvc *register.Service
|
||||||
subscribers map[*subscriber][]broker.Subscriber
|
subscribers map[*subscriber][]broker.Subscriber
|
||||||
rpc *rServer
|
rpc *rServer
|
||||||
opts server.Options
|
opts server.Options
|
||||||
|
unknownHandler grpc.StreamHandler
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
started bool
|
started bool
|
||||||
registered bool
|
registered bool
|
||||||
@ -157,6 +158,10 @@ func (g *Server) configure(opts ...server.Option) error {
|
|||||||
g.reflection = v
|
g.reflection = v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
|
||||||
|
g.unknownHandler = h
|
||||||
|
}
|
||||||
|
|
||||||
if restart {
|
if restart {
|
||||||
return g.Start()
|
return g.Start()
|
||||||
}
|
}
|
||||||
@ -316,20 +321,16 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
if svc == nil {
|
if svc == nil {
|
||||||
if g.opts.Context != nil {
|
if g.unknownHandler != nil {
|
||||||
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
|
return g.unknownHandler(srv, stream)
|
||||||
return h(srv, stream)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
|
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
mtype := svc.method[methodName]
|
mtype := svc.method[methodName]
|
||||||
if mtype == nil {
|
if mtype == nil {
|
||||||
if g.opts.Context != nil {
|
if g.unknownHandler != nil {
|
||||||
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
|
return g.unknownHandler(srv, stream)
|
||||||
return h(srv, stream)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
|
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
|
||||||
}
|
}
|
||||||
@ -444,53 +445,8 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
return status.New(statusCode, statusDesc).Err()
|
return status.New(statusCode, statusDesc).Err()
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
type reflectStream struct {
|
|
||||||
stream server.Stream
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
|
|
||||||
return s.stream.Send(rsp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
|
|
||||||
req := &grpcreflect.ServerReflectionRequest{}
|
|
||||||
err := s.stream.Recv(req)
|
|
||||||
return req, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) SetHeader(gmetadata.MD) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) SendHeader(gmetadata.MD) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) SetTrailer(gmetadata.MD) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) Context() context.Context {
|
|
||||||
return s.stream.Context()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) SendMsg(m interface{}) error {
|
|
||||||
return s.stream.Send(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *reflectStream) RecvMsg(m interface{}) error {
|
|
||||||
return s.stream.Recv(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
|
||||||
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||||
opts := g.opts
|
opts := g.opts
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user