add multi user config field, fixup request-id generation
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
openapi_v3 "github.com/google/gnostic/openapiv3"
|
||||
"github.com/google/uuid"
|
||||
grpccli "go.unistack.org/micro-client-grpc/v3"
|
||||
httpcli "go.unistack.org/micro-client-http/v3"
|
||||
jsoncodec "go.unistack.org/micro-codec-json/v3"
|
||||
@@ -156,7 +157,7 @@ func main() {
|
||||
l.Info(ctx, "exiting")
|
||||
}
|
||||
|
||||
func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) {
|
||||
func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.TaskConfig) (any, []any, error) {
|
||||
var err error
|
||||
|
||||
c, ok := clients["http"]
|
||||
@@ -193,6 +194,7 @@ func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
|
||||
httpcli.ErrorMap(errmap),
|
||||
httpcli.Method(task.HTTP.Method),
|
||||
httpcli.Path(task.HTTP.Endpoint),
|
||||
|
||||
// client.WithContentType("application/json"),
|
||||
}
|
||||
|
||||
@@ -205,18 +207,39 @@ func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
|
||||
}
|
||||
|
||||
fn := func() {
|
||||
var cerr error
|
||||
|
||||
metadata := make(map[string]string, len(task.HTTP.Metadata))
|
||||
var rquid string
|
||||
for k, v := range task.HTTP.Metadata {
|
||||
if k == "x-request-id" && v == "generate" {
|
||||
uid, err := uuid.NewV7()
|
||||
if err != nil {
|
||||
l.Error(ctx, "failed to generate x-request-id", err)
|
||||
uid = uuid.Nil
|
||||
} else {
|
||||
v = uid.String()
|
||||
}
|
||||
}
|
||||
metadata[k] = v
|
||||
rquid = v
|
||||
}
|
||||
|
||||
l.Info(ctx, fmt.Sprintf("call %s.%s endpoint %s", treq.Service(), treq.Method(), treq.Endpoint()), "x-request-id", rquid)
|
||||
m.Counter(semconv.ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
// l.Info(ctx, fmt.Sprintf("try to call %s.%s via %s", task.Name, task.Name, task.HTTP.Addr))
|
||||
err = httpconn.Call(ctx, l, c, task.HTTP.Addr, time.Duration(task.Timeout),
|
||||
cerr = httpconn.Call(ctx, rquid, l, c, task.HTTP.Addr, time.Duration(task.Timeout),
|
||||
treq,
|
||||
rsp,
|
||||
opts...)
|
||||
append(opts, client.WithRequestMetadata(metadata))...,
|
||||
)
|
||||
te := time.Since(ts)
|
||||
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
|
||||
|
||||
m.Summary(semconv.ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
m.Histogram(semconv.ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
|
||||
if err != nil {
|
||||
|
||||
if cerr != nil {
|
||||
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "failure")...).Inc()
|
||||
} else {
|
||||
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "success")...).Inc()
|
||||
@@ -226,7 +249,7 @@ func newHTTPTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
|
||||
return fn, nil, nil
|
||||
}
|
||||
|
||||
func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.Task) (any, []any, error) {
|
||||
func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check string, task *config.TaskConfig) (any, []any, error) {
|
||||
var err error
|
||||
|
||||
c, ok := clients["grpc"]
|
||||
@@ -296,17 +319,38 @@ func newGRPCTask(ctx context.Context, l logger.Logger, m meter.Meter, check stri
|
||||
}
|
||||
|
||||
fn := func() {
|
||||
var cerr error
|
||||
var rquid string
|
||||
metadata := make(map[string]string, len(task.HTTP.Metadata))
|
||||
for k, v := range task.GRPC.Metadata {
|
||||
if k == "x-request-id" && v == "generate" {
|
||||
uid, err := uuid.NewV7()
|
||||
if err != nil {
|
||||
l.Error(ctx, "failed to generate x-request-id", err)
|
||||
uid = uuid.Nil
|
||||
} else {
|
||||
v = uid.String()
|
||||
}
|
||||
}
|
||||
metadata[k] = v
|
||||
rquid = v
|
||||
}
|
||||
|
||||
l.Info(ctx, fmt.Sprintf("call %s.%s endpoint %s", treq.Service(), treq.Method(), treq.Endpoint()), "x-request-id", rquid)
|
||||
m.Counter(semconv.ClientRequestInflight, labels...).Inc()
|
||||
ts := time.Now()
|
||||
|
||||
err = grpcconn.Call(ctx, l, c, task.GRPC.Addr, time.Duration(task.Timeout),
|
||||
cerr = grpcconn.Call(ctx, rquid, l, c, task.GRPC.Addr, time.Duration(task.Timeout),
|
||||
treq,
|
||||
rsp)
|
||||
rsp,
|
||||
client.WithRequestMetadata(metadata),
|
||||
)
|
||||
te := time.Since(ts)
|
||||
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
|
||||
|
||||
m.Summary(semconv.ClientRequestLatencyMicroseconds, labels...).Update(te.Seconds())
|
||||
m.Histogram(semconv.ClientRequestDurationSeconds, labels...).Update(te.Seconds())
|
||||
m.Counter(semconv.ClientRequestInflight, labels...).Dec()
|
||||
if err != nil {
|
||||
|
||||
if cerr != nil {
|
||||
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "failure")...).Inc()
|
||||
} else {
|
||||
m.Counter(semconv.ClientRequestTotal, append(labels, "status", "success")...).Inc()
|
||||
|
Reference in New Issue
Block a user