combo prepare #162

Merged
vtolstov merged 3 commits from init-fix into v3 2023-03-04 16:28:25 +03:00
2 changed files with 37 additions and 33 deletions

68
grpc.go
View File

@ -39,13 +39,13 @@ const (
)
/*
type grpcServerReflection struct {
type ServerReflection struct {
srv *grpc.Server
s *serverReflectionServer
}
*/
type grpcServer struct {
type Server struct {
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
@ -60,9 +60,9 @@ type grpcServer struct {
reflection bool
}
func newGRPCServer(opts ...server.Option) server.Server {
func newServer(opts ...server.Option) *Server {
// create a grpc server
g := &grpcServer{
g := &Server{
opts: server.NewOptions(opts...),
rpc: &rServer{
serviceMap: make(map[string]*service),
@ -91,7 +91,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
*/
func (g *grpcServer) configure(opts ...server.Option) error {
func (g *Server) configure(opts ...server.Option) error {
g.Lock()
defer g.Unlock()
@ -128,6 +128,10 @@ func (g *grpcServer) configure(opts ...server.Option) error {
}
}
for _, k := range g.opts.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
maxMsgSize := g.getMaxMsgSize()
gopts := []grpc.ServerOption{
@ -165,7 +169,7 @@ func (g *grpcServer) configure(opts ...server.Option) error {
return nil
}
func (g *grpcServer) getMaxMsgSize() int {
func (g *Server) getMaxMsgSize() int {
if g.opts.Context == nil {
return codec.DefaultMaxMsgSize
}
@ -176,14 +180,14 @@ func (g *grpcServer) getMaxMsgSize() int {
return s
}
func (g *grpcServer) getCredentials() credentials.TransportCredentials {
func (g *Server) getCredentials() credentials.TransportCredentials {
if g.opts.TLSConfig != nil {
return credentials.NewTLS(g.opts.TLSConfig)
}
return nil
}
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
func (g *Server) getGrpcOptions() []grpc.ServerOption {
if g.opts.Context == nil {
return nil
}
@ -196,7 +200,7 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
return opts
}
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
fullMethod, ok := grpc.MethodFromServerStream(stream)
if !ok {
return status.Errorf(codes.Internal, "method does not exist in context")
@ -302,7 +306,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
/*
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
svc = &service{}
svc.typ = reflect.TypeOf(rfl)
svc.rcvr = reflect.ValueOf(rfl)
@ -350,7 +354,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
return g.processStream(ctx, stream, svc, mtype, ct)
}
func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
// for {
var err error
var argv, replyv reflect.Value
@ -493,12 +497,12 @@ func (s *reflectStream) RecvMsg(m interface{}) error {
return s.stream.Recv(m)
}
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
return g.s.ServerReflectionInfo(&reflectStream{stream})
}
*/
func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
opts := g.opts
r := &rpcRequest{
@ -572,7 +576,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
return status.New(statusCode, statusDesc).Err()
}
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
func (g *Server) newCodec(ct string) (codec.Codec, error) {
g.RLock()
defer g.RUnlock()
@ -587,7 +591,7 @@ func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
return nil, codec.ErrUnknownContentType
}
func (g *grpcServer) Options() server.Options {
func (g *Server) Options() server.Options {
g.RLock()
opts := g.opts
g.RUnlock()
@ -595,15 +599,15 @@ func (g *grpcServer) Options() server.Options {
return opts
}
func (g *grpcServer) Init(opts ...server.Option) error {
func (g *Server) Init(opts ...server.Option) error {
return g.configure(opts...)
}
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
return newRPCHandler(h, opts...)
}
func (g *grpcServer) Handle(h server.Handler) error {
func (g *Server) Handle(h server.Handler) error {
if err := g.rpc.register(h.Handler()); err != nil {
return err
}
@ -612,11 +616,11 @@ func (g *grpcServer) Handle(h server.Handler) error {
return nil
}
func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (g *grpcServer) Subscribe(sb server.Subscriber) error {
func (g *Server) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
@ -640,7 +644,7 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
return nil
}
func (g *grpcServer) Register() error {
func (g *Server) Register() error {
g.RLock()
rsvc := g.rsvc
config := g.opts
@ -745,7 +749,7 @@ func (g *grpcServer) Register() error {
return nil
}
func (g *grpcServer) Deregister() error {
func (g *Server) Deregister() error {
var err error
g.RLock()
@ -799,7 +803,7 @@ func (g *grpcServer) Deregister() error {
return nil
}
func (g *grpcServer) Start() error {
func (g *Server) Start() error {
g.RLock()
if g.started {
g.RUnlock()
@ -809,10 +813,6 @@ func (g *grpcServer) Start() error {
config := g.Options()
for _, k := range config.Codecs {
encoding.RegisterCodec(&wrapMicroCodec{k})
}
// micro: config.Transport.Listen(config.Address)
var ts net.Listener
var err error
@ -987,7 +987,7 @@ func (g *grpcServer) Start() error {
return nil
}
func (g *grpcServer) Stop() error {
func (g *Server) Stop() error {
g.RLock()
if !g.started {
g.RUnlock()
@ -1007,14 +1007,18 @@ func (g *grpcServer) Stop() error {
return err
}
func (g *grpcServer) String() string {
func (g *Server) String() string {
return "grpc"
}
func (g *grpcServer) Name() string {
func (g *Server) Name() string {
return g.opts.Name
}
func NewServer(opts ...server.Option) server.Server {
return newGRPCServer(opts...)
func (g *Server) GRPCServer() *grpc.Server {
return g.srv
}
func NewServer(opts ...server.Option) *Server {
return newServer(opts...)
}

View File

@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
}
}
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {