tracer: improve tracing wrapper #240

Merged
vtolstov merged 1 commits from trracer into master 2023-09-01 15:25:47 +03:00
2 changed files with 63 additions and 100 deletions
Showing only changes of commit d3f9078bec - Show all commits

View File

@ -2,11 +2,9 @@ package tracer
import ( import (
"context" "context"
"reflect"
"go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/options" "go.unistack.org/micro/v4/options"
rutil "go.unistack.org/micro/v4/util/reflect"
) )
type SpanStatus int type SpanStatus int
@ -95,41 +93,12 @@ type EventOptions struct {
Labels []interface{} Labels []interface{}
} }
func WithSpanLabels(ls ...interface{}) options.Option { func WithEventLabels(labels ...interface{}) options.Option {
return func(src interface{}) error { return options.Labels(labels...)
v, err := options.Get(src, ".Labels")
if err != nil {
return err
} else if rutil.IsZero(v) {
v = reflect.MakeSlice(reflect.TypeOf(v), 0, len(ls)).Interface()
}
cv := reflect.ValueOf(v)
for _, l := range ls {
reflect.Append(cv, reflect.ValueOf(l))
}
err = options.Set(src, cv, ".Labels")
return err
}
} }
// EventOption func signature func WithSpanLabels(labels ...interface{}) options.Option {
type EventOption func(o *EventOptions) return options.Labels(labels...)
func WithEventLabels(ls ...interface{}) options.Option {
return func(src interface{}) error {
v, err := options.Get(src, ".Labels")
if err != nil {
return err
} else if rutil.IsZero(v) {
v = reflect.MakeSlice(reflect.TypeOf(v), 0, len(ls)).Interface()
}
cv := reflect.ValueOf(v)
for _, l := range ls {
reflect.Append(cv, reflect.ValueOf(l))
}
err = options.Set(src, cv, ".Labels")
return err
}
} }
func WithSpanKind(k SpanKind) options.Option { func WithSpanKind(k SpanKind) options.Option {

View File

@ -4,6 +4,7 @@ package wrapper // import "go.unistack.org/micro/v4/tracer/wrapper"
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"go.unistack.org/micro/v4/client" "go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/metadata"
@ -12,13 +13,13 @@ import (
"go.unistack.org/micro/v4/tracer" "go.unistack.org/micro/v4/tracer"
) )
var DefaultHeadersExctract = []string{metadata.HeaderTopic, metadata.HeaderEndpoint, metadata.HeaderService, metadata.HeaderXRequestID} var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
func extractLabels(md metadata.Metadata) []string { func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
labels := make([]string, 0, 5) labels := make([]interface{}, 0, len(DefaultHeadersExctract))
for _, k := range DefaultHeadersExctract { for _, k := range DefaultHeadersExctract {
if v, ok := md.Get(k); ok { if v, ok := md.Get(k); ok {
labels = append(labels, k, v) labels = append(labels, strings.ToLower(k), v)
} }
} }
return labels return labels
@ -26,10 +27,9 @@ func extractLabels(md metadata.Metadata) []string {
var ( var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []options.Option, sp tracer.Span, err error) { DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []options.Option, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method()))
var labels []interface{} var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, extractLabels(md)) labels = append(labels, ExtractDefaultLabels(md)...)
} }
if err != nil { if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error())
@ -38,10 +38,9 @@ var (
} }
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []options.Option, stream client.Stream, sp tracer.Span, err error) { DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []options.Option, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method()))
var labels []interface{} var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, extractLabels(md)) labels = append(labels, ExtractDefaultLabels(md)...)
} }
if err != nil { if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error())
@ -50,10 +49,9 @@ var (
} }
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method()))
var labels []interface{} var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok { if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, extractLabels(md)) labels = append(labels, ExtractDefaultLabels(md)...)
} }
if err != nil { if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error())
@ -62,10 +60,10 @@ var (
} }
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
var labels []interface{} var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, extractLabels(md)) labels = append(labels, ExtractDefaultLabels(md)...)
} }
if err != nil { if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error())
@ -177,23 +175,22 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
} }
} }
sp, ok := tracer.SpanFromContext(ctx) nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels( tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc", "rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), "rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary", "rpc.call_type", "unary",
), ),
) )
}
defer sp.Finish() defer sp.Finish()
err := ot.Client.Call(ctx, req, rsp, opts...) err := ot.Client.Call(nctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers { for _, o := range ot.opts.ClientCallObservers {
o(ctx, req, rsp, opts, sp, err) o(nctx, req, rsp, opts, sp, err)
} }
return err return err
@ -207,23 +204,22 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...opti
} }
} }
sp, ok := tracer.SpanFromContext(ctx) nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels( tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc", "rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), "rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "stream", "rpc.call_type", "stream",
), ),
) )
}
defer sp.Finish() defer sp.Finish()
stream, err := ot.Client.Stream(ctx, req, opts...) stream, err := ot.Client.Stream(nctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers { for _, o := range ot.opts.ClientStreamObservers {
o(ctx, req, opts, stream, sp, err) o(nctx, req, opts, stream, sp, err)
} }
return stream, err return stream, err
@ -242,24 +238,22 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i
callType = "stream" callType = "stream"
} }
sp, ok := tracer.SpanFromContext(ctx) nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer), tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels( tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc", "rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), "rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", callType, "rpc.call_type", callType,
), ),
) )
}
defer sp.Finish() defer sp.Finish()
err := ot.serverHandler(ctx, req, rsp) err := ot.serverHandler(nctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers { for _, o := range ot.opts.ServerHandlerObservers {
o(ctx, req, rsp, sp, err) o(nctx, req, rsp, sp, err)
} }
return err return err
@ -297,23 +291,23 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.
} }
} }
sp, ok := tracer.SpanFromContext(ctx) nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
if !ok {
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels( tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc", "rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), "rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary", "rpc.call_type", "unary",
), ),
) )
}
defer sp.Finish() defer sp.Finish()
err := ot.clientCallFunc(ctx, addr, req, rsp, opts) err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers { for _, o := range ot.opts.ClientCallFuncObservers {
o(ctx, addr, req, rsp, opts, sp, err) o(nctx, addr, req, rsp, opts, sp, err)
} }
return err return err