add ability to work with protoset

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-11-16 15:21:52 +03:00
parent a2a241cd78
commit bac85a284d
8 changed files with 233 additions and 15 deletions

View File

@@ -2,22 +2,113 @@ package main
import (
"context"
"fmt"
"os"
"time"
grpccli "go.unistack.org/micro-client-grpc/v3"
jsonpbcodec "go.unistack.org/micro-codec-jsonpb/v3"
protocodec "go.unistack.org/micro-codec-proto/v3"
victoriametrics "go.unistack.org/micro-meter-victoriametrics/v3"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/logger/slog"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/servicechecker/pkg/config"
"go.unistack.org/servicechecker/pkg/grpcconn"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)
func main() {
ctx := context.Background()
log := slog.NewLogger()
l := slog.NewLogger()
l.Init()
m := victoriametrics.NewMeter()
f, err := os.Open("config.yaml")
if err != nil {
log.Fatal(ctx, "failed to open config", err)
l.Fatal(ctx, "failed to open config", err)
}
defer f.Close()
cfg := &config.Config{}
if err = cfg.Parse(f); err != nil {
log.Fatal(ctx, "failed to open config", err)
l.Fatal(ctx, "failed to open config", err)
}
clients := make(map[string]client.Client)
gcli := grpccli.NewClient(
client.Codec("application/json", jsonpbcodec.NewCodec()),
client.Codec("application/grpc", protocodec.NewCodec()),
client.Codec("application/grpc+proto", protocodec.NewCodec()),
client.Codec("application/grpc+json", jsonpbcodec.NewCodec()),
client.ContentType("application/grpc"),
client.Retries(0),
// client.TLSConfig(&tls.Config{InsecureSkipVerify: true}),
)
protosetBuf, _ := os.ReadFile("card.protoset")
fdset := &descriptorpb.FileDescriptorSet{}
if err = protocodec.NewCodec().Unmarshal(protosetBuf, fdset); err != nil {
l.Fatal(ctx, "failed to unmarshal protoset file", err)
}
pfileoptions := protodesc.FileOptions{AllowUnresolvable: true}
pfiles, err := pfileoptions.NewFiles(fdset)
if err != nil {
l.Fatal(ctx, "failed to use protoset file", err)
}
gcli.Init()
clients["grpc"] = gcli
for _, svc := range cfg.Services {
c, ok := clients[svc.Type]
if !ok {
l.Error(ctx, fmt.Sprintf("unknown client %s", svc.Type))
continue
}
pdesc, err := pfiles.FindDescriptorByName("card_proto.CardService")
if err != nil {
l.Error(ctx, "failed to find service "+svc.Name)
continue
}
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
if !ok {
l.Error(ctx, "failed to find service "+svc.Name)
continue
}
for _, check := range svc.Checks {
mdesc := sdesc.Methods().ByName(protoreflect.Name(check.Name))
if mdesc == nil {
l.Error(ctx, "failed to find method "+check.Name)
continue
}
req := dynamicpb.NewMessageType(mdesc.Input()).New()
rsp := dynamicpb.NewMessageType(mdesc.Output()).New()
if err = jsonpbcodec.NewCodec().Unmarshal([]byte(check.Data), req); err != nil {
l.Error(ctx, "failed to unmarshal", err)
continue
}
labels := []string{"service", svc.Name, "endpoint", check.Name}
m.Counter(semconv.ClientRequestInflight, labels...).Inc()
ts := time.Now()
err = grpcconn.Call(ctx, l, c, svc.Addr, time.Duration(check.Timeout),
c.NewRequest("card_proto", "CardService."+check.Name, req),
rsp)
te := time.Since(ts)
m.Summary(semconv.ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
m.Histogram(semconv.ClientRequestDurationSeconds, labels...).Update(te.Seconds())
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
if err != nil {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "failure")...).Inc()
} else {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "success")...).Inc()
}
}
}
m.Write(os.Stdout)
}