Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2024-11-17 13:16:28 +03:00
parent 1bba83e6fc
commit 09c107ac3e

View File

@ -159,19 +159,6 @@ func main() {
func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) { func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) {
var err error var err error
openapiBuf, err := os.ReadFile(task.HTTP.OpenAPI)
if err != nil {
l.Error(ctx, "failed to unmarshal openapi file", err)
return nil, nil, err
}
doc, err := openapi_v3.ParseDocument(openapiBuf)
if err != nil {
l.Error(ctx, "failed to unmarshal openapi file", err)
return nil, nil, err
}
_ = doc
c, ok := clients["http"] c, ok := clients["http"]
if !ok { if !ok {
err = fmt.Errorf("unknown client http") err = fmt.Errorf("unknown client http")
@ -179,23 +166,48 @@ func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
return nil, nil, err return nil, nil, err
} }
errmap := make(map[string]interface{}, 1) var req interface{}
errmap["default"] = &codecpb.Frame{} var rsp interface{}
opts := []client.CallOption{ var treq client.Request
httpcli.ErrorMap(errmap), var opts []client.CallOption
httpcli.Method(task.HTTP.Method), var labels []string
httpcli.Path(task.HTTP.Endpoint),
// client.WithContentType("application/json"),
}
rsp := &codecpb.Frame{}
treq := c.NewRequest(task.Name, task.Name, &codecpb.Frame{Data: []byte(task.HTTP.Data)}) if task.HTTP.OpenAPI != "" {
openapiBuf, err := os.ReadFile(task.HTTP.OpenAPI)
if err != nil {
l.Error(ctx, "failed to unmarshal openapi file", err)
return nil, nil, err
}
doc, err := openapi_v3.ParseDocument(openapiBuf)
if err != nil {
l.Error(ctx, "failed to unmarshal openapi file", err)
return nil, nil, err
}
_ = doc
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codecpb.Frame{}
opts = []client.CallOption{
httpcli.ErrorMap(errmap),
httpcli.Method(task.HTTP.Method),
httpcli.Path(task.HTTP.Endpoint),
// client.WithContentType("application/json"),
}
req = &codecpb.Frame{Data: []byte(task.HTTP.Data)}
rsp = &codecpb.Frame{}
treq = c.NewRequest(task.Name, task.Name, req)
labels = []string{"check", check, "task", task.Name, "service", task.Name, "endpoint", task.Name}
}
fn := func() { fn := func() {
labels := []string{"check", check, "task", task.Name, "service", task.Name, "endpoint", task.Name}
m.Counter(semconv.ClientRequestInflight, labels...).Inc() m.Counter(semconv.ClientRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", task.Name, task.Name, task.HTTP.Addr)) // l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", task.Name, task.Name, task.HTTP.Addr))
err = httpconn.Call(ctx, l, c, task.HTTP.Addr, time.Duration(task.Timeout), err = httpconn.Call(ctx, l, c, task.HTTP.Addr, time.Duration(task.Timeout),
treq, treq,
rsp, rsp,
@ -217,25 +229,6 @@ func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) { func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) {
var err error var err error
protosetBuf, err := os.ReadFile(task.GRPC.Protoset)
if err != nil {
l.Error(ctx, "failed to unmarshal protoset file", err)
return nil, nil, err
}
fdset := &descriptorpb.FileDescriptorSet{}
if err = protocodec.NewCodec().Unmarshal(protosetBuf, fdset); err != nil {
l.Error(ctx, "failed to unmarshal protoset file", err)
return nil, nil, err
}
pfileoptions := protodesc.FileOptions{AllowUnresolvable: true}
pfiles, err := pfileoptions.NewFiles(fdset)
if err != nil {
l.Error(ctx, "failed to use protoset file", err)
return nil, nil, err
}
c, ok := clients["grpc"] c, ok := clients["grpc"]
if !ok { if !ok {
err = fmt.Errorf("unknown client grpc") err = fmt.Errorf("unknown client grpc")
@ -243,42 +236,69 @@ func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
return nil, nil, err return nil, nil, err
} }
pkg, svc, mth := grpcconn.ServiceMethod(task.GRPC.Endpoint) var req interface{}
pdesc, err := pfiles.FindDescriptorByName(protoreflect.FullName(pkg + "." + svc)) var rsp interface{}
if err != nil { var treq client.Request
l.Error(ctx, "failed to find service "+pkg+"."+svc) var labels []string
return nil, nil, err
if task.GRPC.Protoset != "" {
protosetBuf, err := os.ReadFile(task.GRPC.Protoset)
if err != nil {
l.Error(ctx, "failed to unmarshal protoset file", err)
return nil, nil, err
}
fdset := &descriptorpb.FileDescriptorSet{}
if err = protocodec.NewCodec().Unmarshal(protosetBuf, fdset); err != nil {
l.Error(ctx, "failed to unmarshal protoset file", err)
return nil, nil, err
}
pfileoptions := protodesc.FileOptions{AllowUnresolvable: true}
pfiles, err := pfileoptions.NewFiles(fdset)
if err != nil {
l.Error(ctx, "failed to use protoset file", err)
return nil, nil, err
}
pkg, svc, mth := grpcconn.ServiceMethod(task.GRPC.Endpoint)
pdesc, err := pfiles.FindDescriptorByName(protoreflect.FullName(pkg + "." + svc))
if err != nil {
l.Error(ctx, "failed to find service "+pkg+"."+svc)
return nil, nil, err
}
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
if !ok {
err = fmt.Errorf("failed to find service " + pkg + "." + svc)
l.Error(ctx, "unable to find service in protoset", err)
return nil, nil, err
}
mdesc := sdesc.Methods().ByName(protoreflect.Name(mth))
if mdesc == nil {
err = fmt.Errorf("unknown method " + mth)
l.Error(ctx, "failed to find method", err)
return nil, nil, err
}
req = dynamicpb.NewMessageType(mdesc.Input()).New()
rsp = dynamicpb.NewMessageType(mdesc.Output()).New()
if err = jsonpbcodec.NewCodec().Unmarshal([]byte(task.GRPC.Data), req); err != nil {
l.Error(ctx, "failed to unmarshal", err)
return nil, nil, err
}
treq = c.NewRequest(pkg, svc+"."+mth, req)
labels = []string{"check", check, "task", task.Name, "service", svc, "endpoint", mth}
} }
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
if !ok {
err = fmt.Errorf("failed to find service " + pkg + "." + svc)
l.Error(ctx, "unable to find service in protoset", err)
return nil, nil, err
}
mdesc := sdesc.Methods().ByName(protoreflect.Name(mth))
if mdesc == nil {
err = fmt.Errorf("unknown method " + mth)
l.Error(ctx, "failed to find method", err)
return nil, nil, err
}
req := dynamicpb.NewMessageType(mdesc.Input()).New()
rsp := dynamicpb.NewMessageType(mdesc.Output()).New()
if err = jsonpbcodec.NewCodec().Unmarshal([]byte(task.GRPC.Data), req); err != nil {
l.Error(ctx, "failed to unmarshal", err)
return nil, nil, err
}
treq := c.NewRequest(pkg, svc+"."+mth, req)
fn := func() { fn := func() {
labels := []string{"check", check, "task", task.Name, "service", svc, "endpoint", mth}
m.Counter(semconv.ClientRequestInflight, labels...).Inc() m.Counter(semconv.ClientRequestInflight, labels...).Inc()
ts := time.Now() ts := time.Now()
l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", svc, mth, task.GRPC.Addr))
err = grpcconn.Call(ctx, l, c, task.GRPC.Addr, time.Duration(task.Timeout), err = grpcconn.Call(ctx, l, c, task.GRPC.Addr, time.Duration(task.Timeout),
treq, treq,
rsp) rsp)