#1 #5
@ -174,11 +174,10 @@ func main() {
|
|||||||
|
|
||||||
var fn any
|
var fn any
|
||||||
var args []any
|
var args []any
|
||||||
p := protoset.NewProtoSet(l)
|
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case task.GRPC != nil:
|
case task.GRPC != nil:
|
||||||
fn, args, err = newGRPCTask(ctx, l, mtr, p, check.Name, task)
|
fn, args, err = newGRPCTask(ctx, l, mtr, check.Name, task)
|
||||||
case task.HTTP != nil:
|
case task.HTTP != nil:
|
||||||
fn, args, err = newHTTPTask(ctx, l, mtr, check.Name, task)
|
fn, args, err = newHTTPTask(ctx, l, mtr, check.Name, task)
|
||||||
case task.GraphQL != nil:
|
case task.GraphQL != nil:
|
||||||
@ -291,7 +290,7 @@ func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
|
|||||||
return fn, nil, nil
|
return fn, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, p *protoset.ProtoSet, check string, task *config.TaskConfig) (any, []any, error) {
|
func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.TaskConfig) (any, []any, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
c, ok := clients["grpc"]
|
c, ok := clients["grpc"]
|
||||||
@ -305,6 +304,7 @@ func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, p *protose
|
|||||||
var rsp interface{}
|
var rsp interface{}
|
||||||
var treq client.Request
|
var treq client.Request
|
||||||
var labels []string
|
var labels []string
|
||||||
|
p := protoset.NewProtoSet()
|
||||||
|
|
||||||
if task.GRPC.Protoset != "" {
|
if task.GRPC.Protoset != "" {
|
||||||
pkg, svc, mth := grpcconn.ServiceMethod(task.GRPC.Endpoint)
|
pkg, svc, mth := grpcconn.ServiceMethod(task.GRPC.Endpoint)
|
||||||
@ -315,12 +315,12 @@ func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, p *protose
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = p.AddProtoset(ctx, task.GRPC.Addr, svc, protosetBuf); err != nil {
|
if err = p.AddProtoset(task.GRPC.Addr, svc, protosetBuf); err != nil {
|
||||||
l.Error(ctx, "failed add protoset", err)
|
l.Error(ctx, "failed add protoset", err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
req, rsp, err = p.GetMessage(ctx, task.GRPC.Addr, pkg, svc, mth)
|
req, rsp, err = p.GetMessage(task.GRPC.Addr, pkg, svc, mth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Error(ctx, "failed get req, rsp from protoset", err)
|
l.Error(ctx, "failed get req, rsp from protoset", err)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
protocodec "go.unistack.org/micro-codec-proto/v3"
|
protocodec "go.unistack.org/micro-codec-proto/v3"
|
||||||
"go.unistack.org/micro/v3/logger"
|
|
||||||
"google.golang.org/protobuf/reflect/protodesc"
|
"google.golang.org/protobuf/reflect/protodesc"
|
||||||
"google.golang.org/protobuf/reflect/protoreflect"
|
"google.golang.org/protobuf/reflect/protoreflect"
|
||||||
"google.golang.org/protobuf/reflect/protoregistry"
|
"google.golang.org/protobuf/reflect/protoregistry"
|
||||||
@ -20,49 +19,39 @@ var errNotFound = errors.New("file descriptor not found")
|
|||||||
type ProtoSet struct {
|
type ProtoSet struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
files map[string]*protoregistry.Files
|
files map[string]*protoregistry.Files
|
||||||
log logger.Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProtoSet(log logger.Logger) *ProtoSet {
|
func NewProtoSet() *ProtoSet {
|
||||||
return &ProtoSet{
|
return &ProtoSet{
|
||||||
mu: sync.Mutex{},
|
mu: sync.Mutex{},
|
||||||
files: make(map[string]*protoregistry.Files, 0),
|
files: make(map[string]*protoregistry.Files, 0),
|
||||||
log: log,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProtoSet) GetMessage(ctx context.Context, addr, pkg, svc, mth string) (protoreflect.Message, protoreflect.Message, error) {
|
func (p *ProtoSet) GetMessage(addr, pkg, svc, mth string) (protoreflect.Message, protoreflect.Message, error) {
|
||||||
p.log.Debug(ctx, "start of GetMessage")
|
|
||||||
if addr == "" || svc == "" || mth == "" {
|
if addr == "" || svc == "" || mth == "" {
|
||||||
p.log.Error(ctx, "protoset: empty addr or service param")
|
|
||||||
return nil, nil, errors.New("addr or service name is empty")
|
return nil, nil, errors.New("addr or service name is empty")
|
||||||
}
|
}
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
pfile, ok := p.files[addr+"|"+svc]
|
pfile, ok := p.files[addr+"|"+svc]
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
if !ok || pfile == nil {
|
if !ok || pfile == nil {
|
||||||
p.log.Error(ctx, "protoset: file desc not found")
|
|
||||||
return nil, nil, errNotFound
|
return nil, nil, errNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
pdesc, err := pfile.FindDescriptorByName(protoreflect.FullName(pkg + "." + svc))
|
pdesc, err := pfile.FindDescriptorByName(protoreflect.FullName(pkg + "." + svc))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(ctx, "failed to find service "+pkg+"."+svc)
|
return nil, nil, fmt.Errorf("failed to find service %s.%s, err: %s", pkg, svc, err)
|
||||||
return nil, nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
|
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
|
||||||
if !ok {
|
if !ok {
|
||||||
err = fmt.Errorf("failed to find service " + pkg + "." + svc)
|
return nil, nil, fmt.Errorf("failed to find service " + pkg + "." + svc)
|
||||||
p.log.Error(ctx, "unable to find service in protoset", err)
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mdesc := sdesc.Methods().ByName(protoreflect.Name(mth))
|
mdesc := sdesc.Methods().ByName(protoreflect.Name(mth))
|
||||||
if mdesc == nil {
|
if mdesc == nil {
|
||||||
err = fmt.Errorf("unknown method " + mth)
|
return nil, nil, fmt.Errorf("unknown method " + mth)
|
||||||
p.log.Error(ctx, "failed to find method", err)
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
req := dynamicpb.NewMessageType(mdesc.Input()).New()
|
req := dynamicpb.NewMessageType(mdesc.Input()).New()
|
||||||
@ -71,20 +60,16 @@ func (p *ProtoSet) GetMessage(ctx context.Context, addr, pkg, svc, mth string) (
|
|||||||
return req, rsp, nil
|
return req, rsp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProtoSet) AddProtoset(ctx context.Context, addr, svc string, data []byte) error {
|
func (p *ProtoSet) AddProtoset(addr, svc string, data []byte) error {
|
||||||
p.log.Debug(ctx, "start of AddProtoset")
|
|
||||||
|
|
||||||
fdset := &descriptorpb.FileDescriptorSet{}
|
fdset := &descriptorpb.FileDescriptorSet{}
|
||||||
if err := protocodec.NewCodec().Unmarshal(data, fdset); err != nil {
|
if err := protocodec.NewCodec().Unmarshal(data, fdset); err != nil {
|
||||||
p.log.Error(ctx, "failed to unmarshal protoset file", err)
|
return fmt.Errorf("failed to unmarshal protoset file: %s", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pfileoptions := protodesc.FileOptions{AllowUnresolvable: true}
|
pfileoptions := protodesc.FileOptions{AllowUnresolvable: true}
|
||||||
pfiles, err := pfileoptions.NewFiles(fdset)
|
pfiles, err := pfileoptions.NewFiles(fdset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error(ctx, "failed to use protoset file", err)
|
return fmt.Errorf("failed to use protoset file, err: %s", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
@ -95,6 +80,5 @@ func (p *ProtoSet) AddProtoset(ctx context.Context, addr, svc string, data []byt
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProtoSet) AddReflection(ctx context.Context, service string, addr string) error {
|
func (p *ProtoSet) AddReflection(ctx context.Context, service string, addr string) error {
|
||||||
p.log.Debug(ctx, "start of AddReflection")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user