initial http support

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-11-16 23:57:58 +03:00
parent f7127198d6
commit 1bba83e6fc
7 changed files with 229 additions and 78 deletions

View File

@@ -7,21 +7,27 @@ import (
"os/signal"
"time"
openapi_v3 "github.com/google/gnostic/openapiv3"
grpccli "go.unistack.org/micro-client-grpc/v3"
httpcli "go.unistack.org/micro-client-http/v3"
jsoncodec "go.unistack.org/micro-codec-json/v3"
jsonpbcodec "go.unistack.org/micro-codec-jsonpb/v3"
protocodec "go.unistack.org/micro-codec-proto/v3"
victoriametrics "go.unistack.org/micro-meter-victoriametrics/v3"
codecpb "go.unistack.org/micro-proto/v3/codec"
httpsrv "go.unistack.org/micro-server-http/v3"
healthhandler "go.unistack.org/micro-server-http/v3/handler/health"
meterhandler "go.unistack.org/micro-server-http/v3/handler/meter"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/logger/slog"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
"go.unistack.org/servicechecker/pkg/config"
"go.unistack.org/servicechecker/pkg/grpcconn"
"go.unistack.org/servicechecker/pkg/httpconn"
"go.unistack.org/servicechecker/pkg/scheduler"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
@@ -29,6 +35,8 @@ import (
"google.golang.org/protobuf/types/dynamicpb"
)
var clients = make(map[string]client.Client)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()
@@ -88,7 +96,6 @@ func main() {
l.Fatal(ctx, "failed to start http server", err)
}
clients := make(map[string]client.Client)
gcli := grpccli.NewClient(
client.Codec("application/json", jsonpbcodec.NewCodec()),
client.Codec("application/grpc", protocodec.NewCodec()),
@@ -98,83 +105,193 @@ func main() {
client.Retries(0),
// client.TLSConfig(&tls.Config{InsecureSkipVerify: true}),
)
protosetBuf, _ := os.ReadFile("card.protoset")
fdset := &descriptorpb.FileDescriptorSet{}
if err = protocodec.NewCodec().Unmarshal(protosetBuf, fdset); err != nil {
l.Fatal(ctx, "failed to unmarshal protoset file", err)
if err = gcli.Init(); err != nil {
l.Fatal(ctx, "failed to init grpc client", err)
}
pfileoptions := protodesc.FileOptions{AllowUnresolvable: true}
pfiles, err := pfileoptions.NewFiles(fdset)
if err != nil {
l.Fatal(ctx, "failed to use protoset file", err)
}
gcli.Init()
clients["grpc"] = gcli
hcli := httpcli.NewClient(
client.Codec("text/html", codec.NewCodec()),
client.Codec("text/plain", codec.NewCodec()),
client.Codec("application/json", jsonpbcodec.NewCodec()),
client.ContentType("application/json"),
client.Retries(0),
// client.TLSConfig(&tls.Config{InsecureSkipVerify: true}),
)
if err = hcli.Init(); err != nil {
l.Fatal(ctx, "failed to init http client", err)
}
clients["http"] = hcli
for _, check := range cfg.Checks {
l.Info(ctx, fmt.Sprintf("check %#+v", check))
if !check.Active {
continue
}
for _, task := range check.Tasks {
l.Info(ctx, fmt.Sprintf("task %#+v", task))
if task.GRPC != nil {
c, ok := clients["grpc"]
if !ok {
l.Error(ctx, fmt.Sprintf("unknown client %s", "grpc"))
continue
}
pkg, svc, mth := grpcconn.ServiceMethod(task.GRPC.Endpoint)
pdesc, err := pfiles.FindDescriptorByName(protoreflect.FullName(pkg + "." + svc))
if err != nil {
l.Error(ctx, "failed to find service "+pkg+"."+svc)
continue
}
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
if !ok {
l.Error(ctx, "failed to find service "+pkg+"."+svc)
continue
}
mdesc := sdesc.Methods().ByName(protoreflect.Name(mth))
if mdesc == nil {
l.Error(ctx, "failed to find method "+mth)
continue
}
req := dynamicpb.NewMessageType(mdesc.Input()).New()
rsp := dynamicpb.NewMessageType(mdesc.Output()).New()
if err = jsonpbcodec.NewCodec().Unmarshal([]byte(task.GRPC.Data), req); err != nil {
l.Error(ctx, "failed to unmarshal", err)
continue
}
treq := c.NewRequest(pkg, svc+"."+mth, req)
sch.NewJob(time.Duration(check.Interval), func() {
labels := []string{"check", check.Name, "task", task.Name, "service", svc, "endpoint", mth}
m.Counter(semconv.ClientRequestInflight, labels...).Inc()
ts := time.Now()
l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", svc, mth, task.GRPC.Addr))
err = grpcconn.Call(ctx, l, c, task.GRPC.Addr, time.Duration(task.Timeout),
treq,
rsp)
te := time.Since(ts)
m.Summary(semconv.ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
m.Histogram(semconv.ClientRequestDurationSeconds, labels...).Update(te.Seconds())
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
if err != nil {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "failure")...).Inc()
} else {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "success")...).Inc()
}
})
if !task.Active {
continue
}
var fn any
var args []any
switch {
case task.GRPC != nil:
fn, args, err = newGRPCTask(ctx, l, m, check.Name, task)
case task.HTTP != nil:
fn, args, err = newHTTPTask(ctx, l, m, check.Name, task)
}
if err != nil {
l.Error(ctx, "failed to create task", err)
continue
}
l.Info(ctx, fmt.Sprintf("add new task %v", fn))
sch.NewJob(time.Duration(check.Interval), fn, args...)
}
}
<-ctx.Done()
l.Info(ctx, "exiting")
}
func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) {
var err error
openapiBuf, err := os.ReadFile(task.HTTP.OpenAPI)
if err != nil {
l.Error(ctx, "failed to unmarshal openapi file", err)
return nil, nil, err
}
doc, err := openapi_v3.ParseDocument(openapiBuf)
if err != nil {
l.Error(ctx, "failed to unmarshal openapi file", err)
return nil, nil, err
}
_ = doc
c, ok := clients["http"]
if !ok {
err = fmt.Errorf("unknown client http")
l.Error(ctx, "failed to get client", err)
return nil, nil, err
}
errmap := make(map[string]interface{}, 1)
errmap["default"] = &codecpb.Frame{}
opts := []client.CallOption{
httpcli.ErrorMap(errmap),
httpcli.Method(task.HTTP.Method),
httpcli.Path(task.HTTP.Endpoint),
// client.WithContentType("application/json"),
}
rsp := &codecpb.Frame{}
treq := c.NewRequest(task.Name, task.Name, &codecpb.Frame{Data: []byte(task.HTTP.Data)})
fn := func() {
labels := []string{"check", check, "task", task.Name, "service", task.Name, "endpoint", task.Name}
m.Counter(semconv.ClientRequestInflight, labels...).Inc()
ts := time.Now()
l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", task.Name, task.Name, task.HTTP.Addr))
err = httpconn.Call(ctx, l, c, task.HTTP.Addr, time.Duration(task.Timeout),
treq,
rsp,
opts...)
te := time.Since(ts)
m.Summary(semconv.ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
m.Histogram(semconv.ClientRequestDurationSeconds, labels...).Update(te.Seconds())
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
if err != nil {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "failure")...).Inc()
} else {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "success")...).Inc()
}
}
return fn, nil, nil
}
func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) {
var err error
protosetBuf, err := os.ReadFile(task.GRPC.Protoset)
if err != nil {
l.Error(ctx, "failed to unmarshal protoset file", err)
return nil, nil, err
}
fdset := &descriptorpb.FileDescriptorSet{}
if err = protocodec.NewCodec().Unmarshal(protosetBuf, fdset); err != nil {
l.Error(ctx, "failed to unmarshal protoset file", err)
return nil, nil, err
}
pfileoptions := protodesc.FileOptions{AllowUnresolvable: true}
pfiles, err := pfileoptions.NewFiles(fdset)
if err != nil {
l.Error(ctx, "failed to use protoset file", err)
return nil, nil, err
}
c, ok := clients["grpc"]
if !ok {
err = fmt.Errorf("unknown client grpc")
l.Error(ctx, "failed to get client", err)
return nil, nil, err
}
pkg, svc, mth := grpcconn.ServiceMethod(task.GRPC.Endpoint)
pdesc, err := pfiles.FindDescriptorByName(protoreflect.FullName(pkg + "." + svc))
if err != nil {
l.Error(ctx, "failed to find service "+pkg+"."+svc)
return nil, nil, err
}
sdesc, ok := pdesc.(protoreflect.ServiceDescriptor)
if !ok {
err = fmt.Errorf("failed to find service " + pkg + "." + svc)
l.Error(ctx, "unable to find service in protoset", err)
return nil, nil, err
}
mdesc := sdesc.Methods().ByName(protoreflect.Name(mth))
if mdesc == nil {
err = fmt.Errorf("unknown method " + mth)
l.Error(ctx, "failed to find method", err)
return nil, nil, err
}
req := dynamicpb.NewMessageType(mdesc.Input()).New()
rsp := dynamicpb.NewMessageType(mdesc.Output()).New()
if err = jsonpbcodec.NewCodec().Unmarshal([]byte(task.GRPC.Data), req); err != nil {
l.Error(ctx, "failed to unmarshal", err)
return nil, nil, err
}
treq := c.NewRequest(pkg, svc+"."+mth, req)
fn := func() {
labels := []string{"check", check, "task", task.Name, "service", svc, "endpoint", mth}
m.Counter(semconv.ClientRequestInflight, labels...).Inc()
ts := time.Now()
l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", svc, mth, task.GRPC.Addr))
err = grpcconn.Call(ctx, l, c, task.GRPC.Addr, time.Duration(task.Timeout),
treq,
rsp)
te := time.Since(ts)
m.Summary(semconv.ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
m.Histogram(semconv.ClientRequestDurationSeconds, labels...).Update(te.Seconds())
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
if err != nil {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "failure")...).Inc()
} else {
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "success")...).Inc()
}
}
return fn, nil, nil
}