Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2021-03-24 15:09:40 +03:00
parent dd87cf534f
commit 72fdde0445
3 changed files with 36 additions and 69 deletions

93
grpc.go
View File

@@ -53,6 +53,7 @@ type grpcServer struct {
opts server.Options
handlers map[string]server.Handler
subscribers map[*subscriber][]broker.Subscriber
init bool
// marks the serve as started
started bool
// used for first registration
@@ -80,19 +81,22 @@ func newGRPCServer(opts ...server.Option) server.Server {
return g
}
/*
type grpcRouter struct {
h func(context.Context, server.Request, interface{}) error
m func(context.Context, server.Message) error
h func(context.Context, server.Request, interface{}) error
m func(context.Context, server.Message) error
}
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
return r.m(ctx, msg)
return r.m(ctx, msg)
}
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
return r.h(ctx, req, rsp)
return r.h(ctx, req, rsp)
}
*/
func (g *grpcServer) configure(opts ...server.Option) error {
g.Lock()
defer g.Unlock()
@@ -101,12 +105,26 @@ func (g *grpcServer) configure(opts ...server.Option) error {
o(&g.opts)
}
if g.opts.Register == nil {
return fmt.Errorf("register not set")
if err := g.opts.Register.Init(); err != nil {
return err
}
if g.opts.Broker == nil {
return fmt.Errorf("broker not set")
if err := g.opts.Broker.Init(); err != nil {
return err
}
if err := g.opts.Tracer.Init(); err != nil {
return err
}
if err := g.opts.Auth.Init(); err != nil {
return err
}
if err := g.opts.Logger.Init(); err != nil {
return err
}
if err := g.opts.Meter.Init(); err != nil {
return err
}
if err := g.opts.Transport.Init(); err != nil {
return err
}
g.wg = g.opts.Wait
@@ -153,6 +171,8 @@ func (g *grpcServer) configure(opts ...server.Option) error {
return g.Start()
}
g.init = true
return nil
}
@@ -270,58 +290,6 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
}
}
// process via router
if g.opts.Router != nil {
cf, err := g.newCodec(ct)
if err != nil {
return errors.InternalServerError(g.opts.Name, err.Error())
}
// create a client.Request
request := &rpcRequest{
rw: &wrapStream{stream},
service: serviceFromMethod(fullMethod),
contentType: ct,
method: fmt.Sprintf("%s.%s", serviceName, methodName),
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
target: g.opts.Name,
codec: cf,
stream: true,
}
response := &rpcResponse{
method: fmt.Sprintf("%s.%s", serviceName, methodName),
endpoint: fmt.Sprintf("%s.%s", serviceName, methodName),
target: g.opts.Name,
rw: &wrapStream{stream},
header: make(map[string]string),
codec: cf,
}
// create a wrapped function
handler := func(ctx context.Context, req server.Request, rsp interface{}) error {
return g.opts.Router.ServeRequest(ctx, req, rsp.(server.Response))
}
// execute the wrapper for it
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
handler = g.opts.HdlrWrappers[i-1](handler)
}
r := grpcRouter{h: handler}
// serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil {
if _, ok := status.FromError(err); ok {
return err
}
return status.Errorf(codes.Internal, err.Error())
}
return nil
}
// process the standard request flow
g.rpc.mu.RLock()
svc := g.rpc.serviceMap[serviceName]
g.rpc.mu.RUnlock()
@@ -597,6 +565,9 @@ func (g *grpcServer) Options() server.Options {
}
func (g *grpcServer) Init(opts ...server.Option) error {
if len(opts) == 0 && g.init {
return nil
}
return g.configure(opts...)
}