update config

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-11-16 19:57:00 +03:00
parent f610dd4e47
commit 22db6d86ba
3 changed files with 53 additions and 35 deletions

View File

@@ -40,14 +40,18 @@ func main() {
l.Fatal(ctx, "failed to open config", err)
}
l.Info(ctx, "scheduler: create")
s, err := scheduler.NewScheduler()
if err != nil {
l.Fatal(ctx, "failed to create scheduler", err)
}
l.Info(ctx, "scheduler: created")
l.Info(ctx, "scheduler: try to start")
if err = s.Start(); err != nil {
l.Fatal(ctx, "failed to start scheduler", err)
}
l.Info(ctx, "scheduler: starting")
clients := make(map[string]client.Client)
gcli := grpccli.NewClient(
@@ -74,43 +78,48 @@ func main() {
gcli.Init()
clients["grpc"] = gcli
for _, svc := range cfg.Services {
c, ok := clients[svc.Type]
if !ok {
l.Error(ctx, fmt.Sprintf("unknown client %s", svc.Type))
continue
}
pdesc, err := pfiles.FindDescriptorByName("card_proto.CardService")
if err != nil {
l.Error(ctx, "failed to find service "+svc.Name)
continue
}
for _, check := range cfg.Checks {
l.Info(ctx, fmt.Sprintf("check %#+v", check))
for _, task := range check.Tasks {
l.Info(ctx, fmt.Sprintf("task %#+v", task))
c, ok := clients[task.Type]
if !ok {
l.Error(ctx, fmt.Sprintf("unknown client %s", task.Type))
continue
}
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
if !ok {
l.Error(ctx, "failed to find service "+svc.Name)
continue
}
pkg, svc, mth := grpcconn.ServiceMethod(task.Endpoint)
pdesc, err := pfiles.FindDescriptorByName(protoreflect.FullName(pkg + "." + svc))
if err != nil {
l.Error(ctx, "failed to find service "+pkg+"."+svc)
continue
}
for _, check := range svc.Checks {
mdesc := sdesc.Methods().ByName(protoreflect.Name(check.Name))
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
if !ok {
l.Error(ctx, "failed to find service "+pkg+"."+svc)
continue
}
mdesc := sdesc.Methods().ByName(protoreflect.Name(mth))
if mdesc == nil {
l.Error(ctx, "failed to find method "+check.Name)
l.Error(ctx, "failed to find method "+mth)
continue
}
req := dynamicpb.NewMessageType(mdesc.Input()).New()
rsp := dynamicpb.NewMessageType(mdesc.Output()).New()
if err = jsonpbcodec.NewCodec().Unmarshal([]byte(check.Data), req); err != nil {
if err = jsonpbcodec.NewCodec().Unmarshal([]byte(task.Data), req); err != nil {
l.Error(ctx, "failed to unmarshal", err)
continue
}
labels := []string{"service", svc.Name, "endpoint", check.Name}
labels := []string{"check", check.Name, "task", task.Name, "service", svc, "endpoint", mth}
m.Counter(semconv.ClientRequestInflight, labels...).Inc()
ts := time.Now()
err = grpcconn.Call(ctx, l, c, svc.Addr, time.Duration(check.Timeout),
c.NewRequest("card_proto", "CardService."+check.Name, req),
l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", svc, mth, task.Addr))
err = grpcconn.Call(ctx, l, c, task.Addr, time.Duration(task.Timeout),
c.NewRequest(pkg, svc+"."+mth, req),
rsp)
te := time.Since(ts)
m.Summary(semconv.ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
@@ -121,6 +130,7 @@ func main() {
} else {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "success")...).Inc()
}
}
}