tracer: tweaks for span tags and naming #239
@ -3,6 +3,8 @@ package tracer // import "go.unistack.org/micro/v3/tracer"
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultTracer is the global default tracer
|
// DefaultTracer is the global default tracer
|
||||||
@ -42,3 +44,37 @@ type Span interface {
|
|||||||
// Kind returns span kind
|
// Kind returns span kind
|
||||||
Kind() SpanKind
|
Kind() SpanKind
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sort labels alphabeticaly by label name
|
||||||
|
type byKey []interface{}
|
||||||
|
|
||||||
|
func (k byKey) Len() int { return len(k) / 2 }
|
||||||
|
func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) }
|
||||||
|
func (k byKey) Swap(i, j int) {
|
||||||
|
k[i*2], k[j*2] = k[j*2], k[i*2]
|
||||||
|
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func UniqLabels(labels []interface{}) []interface{} {
|
||||||
|
if len(labels)%2 == 1 {
|
||||||
|
labels = labels[:len(labels)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(labels) > 2 {
|
||||||
|
sort.Sort(byKey(labels))
|
||||||
|
|
||||||
|
idx := 0
|
||||||
|
for {
|
||||||
|
if labels[idx] == labels[idx+2] {
|
||||||
|
copy(labels[idx:], labels[idx+2:])
|
||||||
|
labels = labels[:len(labels)-2]
|
||||||
|
} else {
|
||||||
|
idx += 2
|
||||||
|
}
|
||||||
|
if idx+2 >= len(labels) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
13
tracer/tracer_test.go
Normal file
13
tracer/tracer_test.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package tracer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestUniqLabels(t *testing.T) {
|
||||||
|
labels := []interface{}{"key1", "val1", "key1", "val2"}
|
||||||
|
labels = UniqLabels(labels)
|
||||||
|
if labels[1] != "val2" {
|
||||||
|
t.Fatalf("UniqLabels not works")
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,7 @@ package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper"
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"go.unistack.org/micro/v3/client"
|
"go.unistack.org/micro/v3/client"
|
||||||
"go.unistack.org/micro/v3/metadata"
|
"go.unistack.org/micro/v3/metadata"
|
||||||
@ -11,13 +12,13 @@ import (
|
|||||||
"go.unistack.org/micro/v3/tracer"
|
"go.unistack.org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
var DefaultHeadersExctract = []string{metadata.HeaderTopic, metadata.HeaderEndpoint, metadata.HeaderService, metadata.HeaderXRequestID}
|
var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
|
||||||
|
|
||||||
func extractLabels(md metadata.Metadata) []string {
|
func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
|
||||||
labels := make([]string, 0, 5)
|
labels := make([]interface{}, 0, len(DefaultHeadersExctract))
|
||||||
for _, k := range DefaultHeadersExctract {
|
for _, k := range DefaultHeadersExctract {
|
||||||
if v, ok := md.Get(k); ok {
|
if v, ok := md.Get(k); ok {
|
||||||
labels = append(labels, k, v)
|
labels = append(labels, strings.ToLower(k), v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return labels
|
return labels
|
||||||
@ -25,10 +26,9 @@ func extractLabels(md metadata.Metadata) []string {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
||||||
sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method()))
|
|
||||||
var labels []interface{}
|
var labels []interface{}
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
labels = append(labels, extractLabels(md))
|
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
@ -37,10 +37,9 @@ var (
|
|||||||
}
|
}
|
||||||
|
|
||||||
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
||||||
sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method()))
|
|
||||||
var labels []interface{}
|
var labels []interface{}
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
labels = append(labels, extractLabels(md))
|
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
@ -49,11 +48,11 @@ var (
|
|||||||
}
|
}
|
||||||
|
|
||||||
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
||||||
sp.SetName(fmt.Sprintf("Publish %s", msg.Topic()))
|
|
||||||
var labels []interface{}
|
var labels []interface{}
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
labels = append(labels, extractLabels(md))
|
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||||
}
|
}
|
||||||
|
labels = append(labels, ExtractDefaultLabels(msg.Metadata())...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
}
|
}
|
||||||
@ -61,10 +60,9 @@ var (
|
|||||||
}
|
}
|
||||||
|
|
||||||
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
||||||
sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method()))
|
|
||||||
var labels []interface{}
|
var labels []interface{}
|
||||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||||
labels = append(labels, extractLabels(md))
|
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
@ -73,11 +71,12 @@ var (
|
|||||||
}
|
}
|
||||||
|
|
||||||
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
||||||
sp.SetName(fmt.Sprintf("Subscriber %s", msg.Topic()))
|
|
||||||
var labels []interface{}
|
var labels []interface{}
|
||||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||||
labels = append(labels, extractLabels(md))
|
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||||
}
|
}
|
||||||
|
labels = append(labels, ExtractDefaultLabels(msg.Header())...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
}
|
}
|
||||||
@ -85,10 +84,10 @@ var (
|
|||||||
}
|
}
|
||||||
|
|
||||||
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
||||||
sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method()))
|
sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
|
||||||
var labels []interface{}
|
var labels []interface{}
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
labels = append(labels, extractLabels(md))
|
labels = append(labels, ExtractDefaultLabels(md)...)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
@ -223,23 +222,22 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sp, ok := tracer.SpanFromContext(ctx)
|
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
|
||||||
if !ok {
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client",
|
tracer.WithSpanLabels(
|
||||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
"rpc.service", req.Service(),
|
||||||
tracer.WithSpanLabels(
|
"rpc.method", req.Method(),
|
||||||
"rpc.flavor", "rpc",
|
"rpc.flavor", "rpc",
|
||||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||||
"rpc.call_type", "unary",
|
"rpc.call_type", "unary",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
|
||||||
err := ot.Client.Call(ctx, req, rsp, opts...)
|
err := ot.Client.Call(nctx, req, rsp, opts...)
|
||||||
|
|
||||||
for _, o := range ot.opts.ClientCallObservers {
|
for _, o := range ot.opts.ClientCallObservers {
|
||||||
o(ctx, req, rsp, opts, sp, err)
|
o(nctx, req, rsp, opts, sp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -253,39 +251,36 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sp, ok := tracer.SpanFromContext(ctx)
|
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
|
||||||
if !ok {
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client",
|
tracer.WithSpanLabels(
|
||||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
"rpc.service", req.Service(),
|
||||||
tracer.WithSpanLabels(
|
"rpc.method", req.Method(),
|
||||||
"rpc.flavor", "rpc",
|
"rpc.flavor", "rpc",
|
||||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||||
"rpc.call_type", "stream",
|
"rpc.call_type", "stream",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
|
||||||
stream, err := ot.Client.Stream(ctx, req, opts...)
|
stream, err := ot.Client.Stream(nctx, req, opts...)
|
||||||
|
|
||||||
for _, o := range ot.opts.ClientStreamObservers {
|
for _, o := range ot.opts.ClientStreamObservers {
|
||||||
o(ctx, req, opts, stream, sp, err)
|
o(nctx, req, opts, stream, sp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return stream, err
|
return stream, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
|
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
|
||||||
sp, ok := tracer.SpanFromContext(ctx)
|
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer))
|
||||||
if !ok {
|
|
||||||
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindProducer))
|
|
||||||
}
|
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
sp.AddLabels("messaging.destination.name", msg.Topic())
|
||||||
err := ot.Client.Publish(ctx, msg, opts...)
|
sp.AddLabels("messaging.operation", "publish")
|
||||||
|
err := ot.Client.Publish(nctx, msg, opts...)
|
||||||
|
|
||||||
for _, o := range ot.opts.ClientPublishObservers {
|
for _, o := range ot.opts.ClientPublishObservers {
|
||||||
o(ctx, msg, opts, sp, err)
|
o(nctx, msg, opts, sp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -304,40 +299,36 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i
|
|||||||
callType = "stream"
|
callType = "stream"
|
||||||
}
|
}
|
||||||
|
|
||||||
sp, ok := tracer.SpanFromContext(ctx)
|
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
|
||||||
if !ok {
|
tracer.WithSpanKind(tracer.SpanKindServer),
|
||||||
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-server",
|
tracer.WithSpanLabels(
|
||||||
tracer.WithSpanKind(tracer.SpanKindServer),
|
"rpc.service", req.Service(),
|
||||||
tracer.WithSpanLabels(
|
"rpc.method", req.Method(),
|
||||||
"rpc.flavor", "rpc",
|
"rpc.flavor", "rpc",
|
||||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||||
"rpc.call_type", callType,
|
"rpc.call_type", callType,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
|
||||||
|
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
|
||||||
err := ot.serverHandler(ctx, req, rsp)
|
err := ot.serverHandler(nctx, req, rsp)
|
||||||
|
|
||||||
for _, o := range ot.opts.ServerHandlerObservers {
|
for _, o := range ot.opts.ServerHandlerObservers {
|
||||||
o(ctx, req, rsp, sp, err)
|
o(nctx, req, rsp, sp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
|
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
|
||||||
sp, ok := tracer.SpanFromContext(ctx)
|
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer))
|
||||||
if !ok {
|
|
||||||
ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindConsumer))
|
|
||||||
}
|
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
sp.AddLabels("messaging.operation", "process")
|
||||||
err := ot.serverSubscriber(ctx, msg)
|
sp.AddLabels("messaging.source.name", msg.Topic())
|
||||||
|
err := ot.serverSubscriber(nctx, msg)
|
||||||
|
|
||||||
for _, o := range ot.opts.ServerSubscriberObservers {
|
for _, o := range ot.opts.ServerSubscriberObservers {
|
||||||
o(ctx, msg, sp, err)
|
o(nctx, msg, sp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -375,23 +366,23 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sp, ok := tracer.SpanFromContext(ctx)
|
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
|
||||||
if !ok {
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client",
|
tracer.WithSpanLabels(
|
||||||
tracer.WithSpanKind(tracer.SpanKindClient),
|
"rpc.service", req.Service(),
|
||||||
tracer.WithSpanLabels(
|
"rpc.method", req.Method(),
|
||||||
"rpc.flavor", "rpc",
|
"rpc.flavor", "rpc",
|
||||||
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
|
||||||
"rpc.call_type", "unary",
|
"rpc.call_type", "unary",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
}
|
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
|
||||||
err := ot.clientCallFunc(ctx, addr, req, rsp, opts)
|
err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
|
||||||
|
|
||||||
for _, o := range ot.opts.ClientCallFuncObservers {
|
for _, o := range ot.opts.ClientCallFuncObservers {
|
||||||
o(ctx, addr, req, rsp, opts, sp, err)
|
o(nctx, addr, req, rsp, opts, sp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user