diff --git a/grpc.go b/grpc.go index e662cea..ad1076a 100644 --- a/grpc.go +++ b/grpc.go @@ -45,14 +45,15 @@ type ServerReflection struct { */ type Server struct { - handlers map[string]server.Handler - srv *grpc.Server - exit chan chan error - wg *sync.WaitGroup - rsvc *register.Service - subscribers map[*subscriber][]broker.Subscriber - rpc *rServer - opts server.Options + handlers map[string]server.Handler + srv *grpc.Server + exit chan chan error + wg *sync.WaitGroup + rsvc *register.Service + subscribers map[*subscriber][]broker.Subscriber + rpc *rServer + opts server.Options + unknownHandler grpc.StreamHandler sync.RWMutex started bool registered bool @@ -157,6 +158,10 @@ func (g *Server) configure(opts ...server.Option) error { g.reflection = v } + if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok { + g.unknownHandler = h + } + if restart { return g.Start() } @@ -316,20 +321,16 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) */ if svc == nil { - if g.opts.Context != nil { - if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok { - return h(srv, stream) - } + if g.unknownHandler != nil { + return g.unknownHandler(srv, stream) } return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() } mtype := svc.method[methodName] if mtype == nil { - if g.opts.Context != nil { - if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok { - return h(srv, stream) - } + if g.unknownHandler != nil { + return g.unknownHandler(srv, stream) } return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err() } @@ -444,53 +445,8 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s } return status.New(statusCode, statusDesc).Err() - // } } -/* -type reflectStream struct { - stream server.Stream -} - -func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error { - return s.stream.Send(rsp) -} - -func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) { - req := &grpcreflect.ServerReflectionRequest{} - err := s.stream.Recv(req) - return req, err -} - -func (s *reflectStream) SetHeader(gmetadata.MD) error { - return nil -} - -func (s *reflectStream) SendHeader(gmetadata.MD) error { - return nil -} - -func (s *reflectStream) SetTrailer(gmetadata.MD) { - -} - -func (s *reflectStream) Context() context.Context { - return s.stream.Context() -} - -func (s *reflectStream) SendMsg(m interface{}) error { - return s.stream.Send(m) -} - -func (s *reflectStream) RecvMsg(m interface{}) error { - return s.stream.Recv(m) -} - -func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { - return g.s.ServerReflectionInfo(&reflectStream{stream}) -} -*/ - func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { opts := g.opts