combo prepare #162
68
grpc.go
68
grpc.go
@ -39,13 +39,13 @@ const (
|
||||
)
|
||||
|
||||
/*
|
||||
type grpcServerReflection struct {
|
||||
type ServerReflection struct {
|
||||
srv *grpc.Server
|
||||
s *serverReflectionServer
|
||||
}
|
||||
*/
|
||||
|
||||
type grpcServer struct {
|
||||
type Server struct {
|
||||
handlers map[string]server.Handler
|
||||
srv *grpc.Server
|
||||
exit chan chan error
|
||||
@ -60,9 +60,9 @@ type grpcServer struct {
|
||||
reflection bool
|
||||
}
|
||||
|
||||
func newGRPCServer(opts ...server.Option) server.Server {
|
||||
func newServer(opts ...server.Option) *Server {
|
||||
// create a grpc server
|
||||
g := &grpcServer{
|
||||
g := &Server{
|
||||
opts: server.NewOptions(opts...),
|
||||
rpc: &rServer{
|
||||
serviceMap: make(map[string]*service),
|
||||
@ -91,7 +91,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
|
||||
|
||||
*/
|
||||
|
||||
func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
func (g *Server) configure(opts ...server.Option) error {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
@ -128,6 +128,10 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
}
|
||||
}
|
||||
|
||||
for _, k := range g.opts.Codecs {
|
||||
encoding.RegisterCodec(&wrapMicroCodec{k})
|
||||
}
|
||||
|
||||
maxMsgSize := g.getMaxMsgSize()
|
||||
|
||||
gopts := []grpc.ServerOption{
|
||||
@ -165,7 +169,7 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) getMaxMsgSize() int {
|
||||
func (g *Server) getMaxMsgSize() int {
|
||||
if g.opts.Context == nil {
|
||||
return codec.DefaultMaxMsgSize
|
||||
}
|
||||
@ -176,14 +180,14 @@ func (g *grpcServer) getMaxMsgSize() int {
|
||||
return s
|
||||
}
|
||||
|
||||
func (g *grpcServer) getCredentials() credentials.TransportCredentials {
|
||||
func (g *Server) getCredentials() credentials.TransportCredentials {
|
||||
if g.opts.TLSConfig != nil {
|
||||
return credentials.NewTLS(g.opts.TLSConfig)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
||||
func (g *Server) getGrpcOptions() []grpc.ServerOption {
|
||||
if g.opts.Context == nil {
|
||||
return nil
|
||||
}
|
||||
@ -196,7 +200,7 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
||||
return opts
|
||||
}
|
||||
|
||||
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||
if !ok {
|
||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||
@ -302,7 +306,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
||||
|
||||
/*
|
||||
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
|
||||
rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
||||
rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
||||
svc = &service{}
|
||||
svc.typ = reflect.TypeOf(rfl)
|
||||
svc.rcvr = reflect.ValueOf(rfl)
|
||||
@ -350,7 +354,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
||||
return g.processStream(ctx, stream, svc, mtype, ct)
|
||||
}
|
||||
|
||||
func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
// for {
|
||||
var err error
|
||||
var argv, replyv reflect.Value
|
||||
@ -493,12 +497,12 @@ func (s *reflectStream) RecvMsg(m interface{}) error {
|
||||
return s.stream.Recv(m)
|
||||
}
|
||||
|
||||
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
||||
}
|
||||
*/
|
||||
|
||||
func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
opts := g.opts
|
||||
|
||||
r := &rpcRequest{
|
||||
@ -572,7 +576,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
|
||||
return status.New(statusCode, statusDesc).Err()
|
||||
}
|
||||
|
||||
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
||||
func (g *Server) newCodec(ct string) (codec.Codec, error) {
|
||||
g.RLock()
|
||||
defer g.RUnlock()
|
||||
|
||||
@ -587,7 +591,7 @@ func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
||||
return nil, codec.ErrUnknownContentType
|
||||
}
|
||||
|
||||
func (g *grpcServer) Options() server.Options {
|
||||
func (g *Server) Options() server.Options {
|
||||
g.RLock()
|
||||
opts := g.opts
|
||||
g.RUnlock()
|
||||
@ -595,15 +599,15 @@ func (g *grpcServer) Options() server.Options {
|
||||
return opts
|
||||
}
|
||||
|
||||
func (g *grpcServer) Init(opts ...server.Option) error {
|
||||
func (g *Server) Init(opts ...server.Option) error {
|
||||
return g.configure(opts...)
|
||||
}
|
||||
|
||||
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
return newRPCHandler(h, opts...)
|
||||
}
|
||||
|
||||
func (g *grpcServer) Handle(h server.Handler) error {
|
||||
func (g *Server) Handle(h server.Handler) error {
|
||||
if err := g.rpc.register(h.Handler()); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -612,11 +616,11 @@ func (g *grpcServer) Handle(h server.Handler) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
return newSubscriber(topic, sb, opts...)
|
||||
}
|
||||
|
||||
func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
||||
func (g *Server) Subscribe(sb server.Subscriber) error {
|
||||
sub, ok := sb.(*subscriber)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
||||
@ -640,7 +644,7 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Register() error {
|
||||
func (g *Server) Register() error {
|
||||
g.RLock()
|
||||
rsvc := g.rsvc
|
||||
config := g.opts
|
||||
@ -745,7 +749,7 @@ func (g *grpcServer) Register() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Deregister() error {
|
||||
func (g *Server) Deregister() error {
|
||||
var err error
|
||||
|
||||
g.RLock()
|
||||
@ -799,7 +803,7 @@ func (g *grpcServer) Deregister() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Start() error {
|
||||
func (g *Server) Start() error {
|
||||
g.RLock()
|
||||
if g.started {
|
||||
g.RUnlock()
|
||||
@ -809,10 +813,6 @@ func (g *grpcServer) Start() error {
|
||||
|
||||
config := g.Options()
|
||||
|
||||
for _, k := range config.Codecs {
|
||||
encoding.RegisterCodec(&wrapMicroCodec{k})
|
||||
}
|
||||
|
||||
// micro: config.Transport.Listen(config.Address)
|
||||
var ts net.Listener
|
||||
var err error
|
||||
@ -987,7 +987,7 @@ func (g *grpcServer) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Stop() error {
|
||||
func (g *Server) Stop() error {
|
||||
g.RLock()
|
||||
if !g.started {
|
||||
g.RUnlock()
|
||||
@ -1007,14 +1007,18 @@ func (g *grpcServer) Stop() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (g *grpcServer) String() string {
|
||||
func (g *Server) String() string {
|
||||
return "grpc"
|
||||
}
|
||||
|
||||
func (g *grpcServer) Name() string {
|
||||
func (g *Server) Name() string {
|
||||
return g.opts.Name
|
||||
}
|
||||
|
||||
func NewServer(opts ...server.Option) server.Server {
|
||||
return newGRPCServer(opts...)
|
||||
func (g *Server) GRPCServer() *grpc.Server {
|
||||
return g.srv
|
||||
}
|
||||
|
||||
func NewServer(opts ...server.Option) *Server {
|
||||
return newServer(opts...)
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
||||
}
|
||||
}
|
||||
|
||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user