diff --git a/grpc.go b/grpc.go index 2be599c..da533d4 100644 --- a/grpc.go +++ b/grpc.go @@ -39,13 +39,13 @@ const ( ) /* -type grpcServerReflection struct { +type ServerReflection struct { srv *grpc.Server s *serverReflectionServer } */ -type grpcServer struct { +type Server struct { handlers map[string]server.Handler srv *grpc.Server exit chan chan error @@ -60,9 +60,9 @@ type grpcServer struct { reflection bool } -func newGRPCServer(opts ...server.Option) server.Server { +func newServer(opts ...server.Option) *Server { // create a grpc server - g := &grpcServer{ + g := &Server{ opts: server.NewOptions(opts...), rpc: &rServer{ serviceMap: make(map[string]*service), @@ -91,7 +91,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se */ -func (g *grpcServer) configure(opts ...server.Option) error { +func (g *Server) configure(opts ...server.Option) error { g.Lock() defer g.Unlock() @@ -128,6 +128,10 @@ func (g *grpcServer) configure(opts ...server.Option) error { } } + for _, k := range g.opts.Codecs { + encoding.RegisterCodec(&wrapMicroCodec{k}) + } + maxMsgSize := g.getMaxMsgSize() gopts := []grpc.ServerOption{ @@ -165,7 +169,7 @@ func (g *grpcServer) configure(opts ...server.Option) error { return nil } -func (g *grpcServer) getMaxMsgSize() int { +func (g *Server) getMaxMsgSize() int { if g.opts.Context == nil { return codec.DefaultMaxMsgSize } @@ -176,14 +180,14 @@ func (g *grpcServer) getMaxMsgSize() int { return s } -func (g *grpcServer) getCredentials() credentials.TransportCredentials { +func (g *Server) getCredentials() credentials.TransportCredentials { if g.opts.TLSConfig != nil { return credentials.NewTLS(g.opts.TLSConfig) } return nil } -func (g *grpcServer) getGrpcOptions() []grpc.ServerOption { +func (g *Server) getGrpcOptions() []grpc.ServerOption { if g.opts.Context == nil { return nil } @@ -196,7 +200,7 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption { return opts } -func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) { +func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) { fullMethod, ok := grpc.MethodFromServerStream(stream) if !ok { return status.Errorf(codes.Internal, "method does not exist in context") @@ -302,7 +306,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err /* if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { - rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} + rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} svc = &service{} svc.typ = reflect.TypeOf(rfl) svc.rcvr = reflect.ValueOf(rfl) @@ -350,7 +354,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err return g.processStream(ctx, stream, svc, mtype, ct) } -func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { +func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { // for { var err error var argv, replyv reflect.Value @@ -493,12 +497,12 @@ func (s *reflectStream) RecvMsg(m interface{}) error { return s.stream.Recv(m) } -func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { +func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error { return g.s.ServerReflectionInfo(&reflectStream{stream}) } */ -func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { +func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { opts := g.opts r := &rpcRequest{ @@ -572,7 +576,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream return status.New(statusCode, statusDesc).Err() } -func (g *grpcServer) newCodec(ct string) (codec.Codec, error) { +func (g *Server) newCodec(ct string) (codec.Codec, error) { g.RLock() defer g.RUnlock() @@ -587,7 +591,7 @@ func (g *grpcServer) newCodec(ct string) (codec.Codec, error) { return nil, codec.ErrUnknownContentType } -func (g *grpcServer) Options() server.Options { +func (g *Server) Options() server.Options { g.RLock() opts := g.opts g.RUnlock() @@ -595,15 +599,15 @@ func (g *grpcServer) Options() server.Options { return opts } -func (g *grpcServer) Init(opts ...server.Option) error { +func (g *Server) Init(opts ...server.Option) error { return g.configure(opts...) } -func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { +func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler { return newRPCHandler(h, opts...) } -func (g *grpcServer) Handle(h server.Handler) error { +func (g *Server) Handle(h server.Handler) error { if err := g.rpc.register(h.Handler()); err != nil { return err } @@ -612,11 +616,11 @@ func (g *grpcServer) Handle(h server.Handler) error { return nil } -func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { +func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber { return newSubscriber(topic, sb, opts...) } -func (g *grpcServer) Subscribe(sb server.Subscriber) error { +func (g *Server) Subscribe(sb server.Subscriber) error { sub, ok := sb.(*subscriber) if !ok { return fmt.Errorf("invalid subscriber: expected *subscriber") @@ -640,7 +644,7 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error { return nil } -func (g *grpcServer) Register() error { +func (g *Server) Register() error { g.RLock() rsvc := g.rsvc config := g.opts @@ -745,7 +749,7 @@ func (g *grpcServer) Register() error { return nil } -func (g *grpcServer) Deregister() error { +func (g *Server) Deregister() error { var err error g.RLock() @@ -799,7 +803,7 @@ func (g *grpcServer) Deregister() error { return nil } -func (g *grpcServer) Start() error { +func (g *Server) Start() error { g.RLock() if g.started { g.RUnlock() @@ -809,10 +813,6 @@ func (g *grpcServer) Start() error { config := g.Options() - for _, k := range config.Codecs { - encoding.RegisterCodec(&wrapMicroCodec{k}) - } - // micro: config.Transport.Listen(config.Address) var ts net.Listener var err error @@ -987,7 +987,7 @@ func (g *grpcServer) Start() error { return nil } -func (g *grpcServer) Stop() error { +func (g *Server) Stop() error { g.RLock() if !g.started { g.RUnlock() @@ -1007,14 +1007,18 @@ func (g *grpcServer) Stop() error { return err } -func (g *grpcServer) String() string { +func (g *Server) String() string { return "grpc" } -func (g *grpcServer) Name() string { +func (g *Server) Name() string { return g.opts.Name } -func NewServer(opts ...server.Option) server.Server { - return newGRPCServer(opts...) +func (g *Server) GRPCServer() *grpc.Server { + return g.srv +} + +func NewServer(opts ...server.Option) *Server { + return newServer(opts...) } diff --git a/subscriber.go b/subscriber.go index b29e691..94f11f5 100644 --- a/subscriber.go +++ b/subscriber.go @@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio } } -func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { +func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { return func(p broker.Event) (err error) { defer func() { if r := recover(); r != nil {