Merge pull request 'combo prepare' (#162) from init-fix into v3
Reviewed-on: #162
This commit is contained in:
commit
832f1034a8
68
grpc.go
68
grpc.go
@ -39,13 +39,13 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
type grpcServerReflection struct {
|
type ServerReflection struct {
|
||||||
srv *grpc.Server
|
srv *grpc.Server
|
||||||
s *serverReflectionServer
|
s *serverReflectionServer
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
type grpcServer struct {
|
type Server struct {
|
||||||
handlers map[string]server.Handler
|
handlers map[string]server.Handler
|
||||||
srv *grpc.Server
|
srv *grpc.Server
|
||||||
exit chan chan error
|
exit chan chan error
|
||||||
@ -60,9 +60,9 @@ type grpcServer struct {
|
|||||||
reflection bool
|
reflection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGRPCServer(opts ...server.Option) server.Server {
|
func newServer(opts ...server.Option) *Server {
|
||||||
// create a grpc server
|
// create a grpc server
|
||||||
g := &grpcServer{
|
g := &Server{
|
||||||
opts: server.NewOptions(opts...),
|
opts: server.NewOptions(opts...),
|
||||||
rpc: &rServer{
|
rpc: &rServer{
|
||||||
serviceMap: make(map[string]*service),
|
serviceMap: make(map[string]*service),
|
||||||
@ -91,7 +91,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
func (g *grpcServer) configure(opts ...server.Option) error {
|
func (g *Server) configure(opts ...server.Option) error {
|
||||||
g.Lock()
|
g.Lock()
|
||||||
defer g.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
@ -128,6 +128,10 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, k := range g.opts.Codecs {
|
||||||
|
encoding.RegisterCodec(&wrapMicroCodec{k})
|
||||||
|
}
|
||||||
|
|
||||||
maxMsgSize := g.getMaxMsgSize()
|
maxMsgSize := g.getMaxMsgSize()
|
||||||
|
|
||||||
gopts := []grpc.ServerOption{
|
gopts := []grpc.ServerOption{
|
||||||
@ -165,7 +169,7 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) getMaxMsgSize() int {
|
func (g *Server) getMaxMsgSize() int {
|
||||||
if g.opts.Context == nil {
|
if g.opts.Context == nil {
|
||||||
return codec.DefaultMaxMsgSize
|
return codec.DefaultMaxMsgSize
|
||||||
}
|
}
|
||||||
@ -176,14 +180,14 @@ func (g *grpcServer) getMaxMsgSize() int {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) getCredentials() credentials.TransportCredentials {
|
func (g *Server) getCredentials() credentials.TransportCredentials {
|
||||||
if g.opts.TLSConfig != nil {
|
if g.opts.TLSConfig != nil {
|
||||||
return credentials.NewTLS(g.opts.TLSConfig)
|
return credentials.NewTLS(g.opts.TLSConfig)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
func (g *Server) getGrpcOptions() []grpc.ServerOption {
|
||||||
if g.opts.Context == nil {
|
if g.opts.Context == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -196,7 +200,7 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
|||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||||
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||||
if !ok {
|
if !ok {
|
||||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||||
@ -302,7 +306,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
|
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
|
||||||
rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
||||||
svc = &service{}
|
svc = &service{}
|
||||||
svc.typ = reflect.TypeOf(rfl)
|
svc.typ = reflect.TypeOf(rfl)
|
||||||
svc.rcvr = reflect.ValueOf(rfl)
|
svc.rcvr = reflect.ValueOf(rfl)
|
||||||
@ -350,7 +354,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
|||||||
return g.processStream(ctx, stream, svc, mtype, ct)
|
return g.processStream(ctx, stream, svc, mtype, ct)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||||
// for {
|
// for {
|
||||||
var err error
|
var err error
|
||||||
var argv, replyv reflect.Value
|
var argv, replyv reflect.Value
|
||||||
@ -493,12 +497,12 @@ func (s *reflectStream) RecvMsg(m interface{}) error {
|
|||||||
return s.stream.Recv(m)
|
return s.stream.Recv(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||||
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||||
opts := g.opts
|
opts := g.opts
|
||||||
|
|
||||||
r := &rpcRequest{
|
r := &rpcRequest{
|
||||||
@ -572,7 +576,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
|
|||||||
return status.New(statusCode, statusDesc).Err()
|
return status.New(statusCode, statusDesc).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
func (g *Server) newCodec(ct string) (codec.Codec, error) {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
defer g.RUnlock()
|
defer g.RUnlock()
|
||||||
|
|
||||||
@ -587,7 +591,7 @@ func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
|||||||
return nil, codec.ErrUnknownContentType
|
return nil, codec.ErrUnknownContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Options() server.Options {
|
func (g *Server) Options() server.Options {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
opts := g.opts
|
opts := g.opts
|
||||||
g.RUnlock()
|
g.RUnlock()
|
||||||
@ -595,15 +599,15 @@ func (g *grpcServer) Options() server.Options {
|
|||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Init(opts ...server.Option) error {
|
func (g *Server) Init(opts ...server.Option) error {
|
||||||
return g.configure(opts...)
|
return g.configure(opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
||||||
return newRPCHandler(h, opts...)
|
return newRPCHandler(h, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Handle(h server.Handler) error {
|
func (g *Server) Handle(h server.Handler) error {
|
||||||
if err := g.rpc.register(h.Handler()); err != nil {
|
if err := g.rpc.register(h.Handler()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -612,11 +616,11 @@ func (g *grpcServer) Handle(h server.Handler) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||||
return newSubscriber(topic, sb, opts...)
|
return newSubscriber(topic, sb, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
func (g *Server) Subscribe(sb server.Subscriber) error {
|
||||||
sub, ok := sb.(*subscriber)
|
sub, ok := sb.(*subscriber)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
||||||
@ -640,7 +644,7 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Register() error {
|
func (g *Server) Register() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
rsvc := g.rsvc
|
rsvc := g.rsvc
|
||||||
config := g.opts
|
config := g.opts
|
||||||
@ -745,7 +749,7 @@ func (g *grpcServer) Register() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Deregister() error {
|
func (g *Server) Deregister() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
g.RLock()
|
g.RLock()
|
||||||
@ -799,7 +803,7 @@ func (g *grpcServer) Deregister() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Start() error {
|
func (g *Server) Start() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
if g.started {
|
if g.started {
|
||||||
g.RUnlock()
|
g.RUnlock()
|
||||||
@ -809,10 +813,6 @@ func (g *grpcServer) Start() error {
|
|||||||
|
|
||||||
config := g.Options()
|
config := g.Options()
|
||||||
|
|
||||||
for _, k := range config.Codecs {
|
|
||||||
encoding.RegisterCodec(&wrapMicroCodec{k})
|
|
||||||
}
|
|
||||||
|
|
||||||
// micro: config.Transport.Listen(config.Address)
|
// micro: config.Transport.Listen(config.Address)
|
||||||
var ts net.Listener
|
var ts net.Listener
|
||||||
var err error
|
var err error
|
||||||
@ -987,7 +987,7 @@ func (g *grpcServer) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Stop() error {
|
func (g *Server) Stop() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
if !g.started {
|
if !g.started {
|
||||||
g.RUnlock()
|
g.RUnlock()
|
||||||
@ -1007,14 +1007,18 @@ func (g *grpcServer) Stop() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) String() string {
|
func (g *Server) String() string {
|
||||||
return "grpc"
|
return "grpc"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Name() string {
|
func (g *Server) Name() string {
|
||||||
return g.opts.Name
|
return g.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(opts ...server.Option) server.Server {
|
func (g *Server) GRPCServer() *grpc.Server {
|
||||||
return newGRPCServer(opts...)
|
return g.srv
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServer(opts ...server.Option) *Server {
|
||||||
|
return newServer(opts...)
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||||
return func(p broker.Event) (err error) {
|
return func(p broker.Event) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user