allow to expose *grpc.Server

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2023-03-03 13:08:06 +03:00
parent dc8a736e13
commit 92dcd1acd7
2 changed files with 33 additions and 29 deletions

60
grpc.go
View File

@ -39,13 +39,13 @@ const (
) )
/* /*
type grpcServerReflection struct { type ServerReflection struct {
srv *grpc.Server srv *grpc.Server
s *serverReflectionServer s *serverReflectionServer
} }
*/ */
type grpcServer struct { type Server struct {
handlers map[string]server.Handler handlers map[string]server.Handler
srv *grpc.Server srv *grpc.Server
exit chan chan error exit chan chan error
@ -60,9 +60,9 @@ type grpcServer struct {
reflection bool reflection bool
} }
func newGRPCServer(opts ...server.Option) server.Server { func newServer(opts ...server.Option) *Server {
// create a grpc server // create a grpc server
g := &grpcServer{ g := &Server{
opts: server.NewOptions(opts...), opts: server.NewOptions(opts...),
rpc: &rServer{ rpc: &rServer{
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
@ -91,7 +91,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
*/ */
func (g *grpcServer) configure(opts ...server.Option) error { func (g *Server) configure(opts ...server.Option) error {
g.Lock() g.Lock()
defer g.Unlock() defer g.Unlock()
@ -165,7 +165,7 @@ func (g *grpcServer) configure(opts ...server.Option) error {
return nil return nil
} }
func (g *grpcServer) getMaxMsgSize() int { func (g *Server) getMaxMsgSize() int {
if g.opts.Context == nil { if g.opts.Context == nil {
return codec.DefaultMaxMsgSize return codec.DefaultMaxMsgSize
} }
@ -176,14 +176,14 @@ func (g *grpcServer) getMaxMsgSize() int {
return s return s
} }
func (g *grpcServer) getCredentials() credentials.TransportCredentials { func (g *Server) getCredentials() credentials.TransportCredentials {
if g.opts.TLSConfig != nil { if g.opts.TLSConfig != nil {
return credentials.NewTLS(g.opts.TLSConfig) return credentials.NewTLS(g.opts.TLSConfig)
} }
return nil return nil
} }
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption { func (g *Server) getGrpcOptions() []grpc.ServerOption {
if g.opts.Context == nil { if g.opts.Context == nil {
return nil return nil
} }
@ -196,7 +196,7 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
return opts return opts
} }
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) { func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
fullMethod, ok := grpc.MethodFromServerStream(stream) fullMethod, ok := grpc.MethodFromServerStream(stream)
if !ok { if !ok {
return status.Errorf(codes.Internal, "method does not exist in context") return status.Errorf(codes.Internal, "method does not exist in context")
@ -302,7 +302,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
/* /*
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
svc = &service{} svc = &service{}
svc.typ = reflect.TypeOf(rfl) svc.typ = reflect.TypeOf(rfl)
svc.rcvr = reflect.ValueOf(rfl) svc.rcvr = reflect.ValueOf(rfl)
@ -350,7 +350,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
return g.processStream(ctx, stream, svc, mtype, ct) return g.processStream(ctx, stream, svc, mtype, ct)
} }
func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
// for { // for {
var err error var err error
var argv, replyv reflect.Value var argv, replyv reflect.Value
@ -493,12 +493,12 @@ func (s *reflectStream) RecvMsg(m interface{}) error {
return s.stream.Recv(m) return s.stream.Recv(m)
} }
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
return g.s.ServerReflectionInfo(&reflectStream{stream}) return g.s.ServerReflectionInfo(&reflectStream{stream})
} }
*/ */
func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
opts := g.opts opts := g.opts
r := &rpcRequest{ r := &rpcRequest{
@ -572,7 +572,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
return status.New(statusCode, statusDesc).Err() return status.New(statusCode, statusDesc).Err()
} }
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) { func (g *Server) newCodec(ct string) (codec.Codec, error) {
g.RLock() g.RLock()
defer g.RUnlock() defer g.RUnlock()
@ -587,7 +587,7 @@ func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType return nil, codec.ErrUnknownContentType
} }
func (g *grpcServer) Options() server.Options { func (g *Server) Options() server.Options {
g.RLock() g.RLock()
opts := g.opts opts := g.opts
g.RUnlock() g.RUnlock()
@ -595,15 +595,15 @@ func (g *grpcServer) Options() server.Options {
return opts return opts
} }
func (g *grpcServer) Init(opts ...server.Option) error { func (g *Server) Init(opts ...server.Option) error {
return g.configure(opts...) return g.configure(opts...)
} }
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
return newRPCHandler(h, opts...) return newRPCHandler(h, opts...)
} }
func (g *grpcServer) Handle(h server.Handler) error { func (g *Server) Handle(h server.Handler) error {
if err := g.rpc.register(h.Handler()); err != nil { if err := g.rpc.register(h.Handler()); err != nil {
return err return err
} }
@ -612,11 +612,11 @@ func (g *grpcServer) Handle(h server.Handler) error {
return nil return nil
} }
func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, sb, opts...) return newSubscriber(topic, sb, opts...)
} }
func (g *grpcServer) Subscribe(sb server.Subscriber) error { func (g *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber) sub, ok := sb.(*subscriber)
if !ok { if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber") return fmt.Errorf("invalid subscriber: expected *subscriber")
@ -640,7 +640,7 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
return nil return nil
} }
func (g *grpcServer) Register() error { func (g *Server) Register() error {
g.RLock() g.RLock()
rsvc := g.rsvc rsvc := g.rsvc
config := g.opts config := g.opts
@ -745,7 +745,7 @@ func (g *grpcServer) Register() error {
return nil return nil
} }
func (g *grpcServer) Deregister() error { func (g *Server) Deregister() error {
var err error var err error
g.RLock() g.RLock()
@ -799,7 +799,7 @@ func (g *grpcServer) Deregister() error {
return nil return nil
} }
func (g *grpcServer) Start() error { func (g *Server) Start() error {
g.RLock() g.RLock()
if g.started { if g.started {
g.RUnlock() g.RUnlock()
@ -987,7 +987,7 @@ func (g *grpcServer) Start() error {
return nil return nil
} }
func (g *grpcServer) Stop() error { func (g *Server) Stop() error {
g.RLock() g.RLock()
if !g.started { if !g.started {
g.RUnlock() g.RUnlock()
@ -1007,14 +1007,18 @@ func (g *grpcServer) Stop() error {
return err return err
} }
func (g *grpcServer) String() string { func (g *Server) String() string {
return "grpc" return "grpc"
} }
func (g *grpcServer) Name() string { func (g *Server) Name() string {
return g.opts.Name return g.opts.Name
} }
func NewServer(opts ...server.Option) server.Server { func (g *Server) GRPCServer() *grpc.Server {
return newGRPCServer(opts...) return g.srv
}
func NewServer(opts ...server.Option) *Server {
return newServer(opts...)
} }

View File

@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
} }
} }
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) { return func(p broker.Event) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {