diff --git a/tracer/tracer.go b/tracer/tracer.go index 3d8af485..0646dab7 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -3,6 +3,8 @@ package tracer // import "go.unistack.org/micro/v3/tracer" import ( "context" + "fmt" + "sort" ) // DefaultTracer is the global default tracer @@ -42,3 +44,37 @@ type Span interface { // Kind returns span kind Kind() SpanKind } + +// sort labels alphabeticaly by label name +type byKey []interface{} + +func (k byKey) Len() int { return len(k) / 2 } +func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) } +func (k byKey) Swap(i, j int) { + k[i*2], k[j*2] = k[j*2], k[i*2] + k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] +} + +func UniqLabels(labels []interface{}) []interface{} { + if len(labels)%2 == 1 { + labels = labels[:len(labels)-1] + } + + if len(labels) > 2 { + sort.Sort(byKey(labels)) + + idx := 0 + for { + if labels[idx] == labels[idx+2] { + copy(labels[idx:], labels[idx+2:]) + labels = labels[:len(labels)-2] + } else { + idx += 2 + } + if idx+2 >= len(labels) { + break + } + } + } + return labels +} diff --git a/tracer/tracer_test.go b/tracer/tracer_test.go new file mode 100644 index 00000000..7388f577 --- /dev/null +++ b/tracer/tracer_test.go @@ -0,0 +1,13 @@ +package tracer + +import ( + "testing" +) + +func TestUniqLabels(t *testing.T) { + labels := []interface{}{"key1", "val1", "key1", "val2"} + labels = UniqLabels(labels) + if labels[1] != "val2" { + t.Fatalf("UniqLabels not works") + } +} diff --git a/tracer/wrapper/wrapper.go b/tracer/wrapper/wrapper.go index 58d2a00e..0190c4e4 100644 --- a/tracer/wrapper/wrapper.go +++ b/tracer/wrapper/wrapper.go @@ -4,6 +4,7 @@ package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper" import ( "context" "fmt" + "strings" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/metadata" @@ -11,13 +12,13 @@ import ( "go.unistack.org/micro/v3/tracer" ) -var DefaultHeadersExctract = []string{metadata.HeaderTopic, metadata.HeaderEndpoint, metadata.HeaderService, metadata.HeaderXRequestID} +var DefaultHeadersExctract = []string{metadata.HeaderXRequestID} -func extractLabels(md metadata.Metadata) []string { - labels := make([]string, 0, 5) +func ExtractDefaultLabels(md metadata.Metadata) []interface{} { + labels := make([]interface{}, 0, len(DefaultHeadersExctract)) for _, k := range DefaultHeadersExctract { if v, ok := md.Get(k); ok { - labels = append(labels, k, v) + labels = append(labels, strings.ToLower(k), v) } } return labels @@ -25,10 +26,9 @@ func extractLabels(md metadata.Metadata) []string { var ( DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -37,10 +37,9 @@ var ( } DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -49,11 +48,11 @@ var ( } DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Publish %s", msg.Topic())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } + labels = append(labels, ExtractDefaultLabels(msg.Metadata())...) if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) } @@ -61,10 +60,9 @@ var ( } DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -73,11 +71,12 @@ var ( } DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Subscriber %s", msg.Topic())) var labels []interface{} if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } + labels = append(labels, ExtractDefaultLabels(msg.Header())...) + if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) } @@ -85,10 +84,10 @@ var ( } DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) + sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = append(labels, extractLabels(md)) + labels = append(labels, ExtractDefaultLabels(md)...) } if err != nil { sp.SetStatus(tracer.SpanStatusError, err.Error()) @@ -223,23 +222,22 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ } } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "unary", - ), - ) - } + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "unary", + ), + ) defer sp.Finish() - err := ot.Client.Call(ctx, req, rsp, opts...) + err := ot.Client.Call(nctx, req, rsp, opts...) for _, o := range ot.opts.ClientCallObservers { - o(ctx, req, rsp, opts, sp, err) + o(nctx, req, rsp, opts, sp, err) } return err @@ -253,39 +251,36 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie } } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "stream", - ), - ) - } + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "stream", + ), + ) defer sp.Finish() - stream, err := ot.Client.Stream(ctx, req, opts...) + stream, err := ot.Client.Stream(nctx, req, opts...) for _, o := range ot.opts.ClientStreamObservers { - o(ctx, req, opts, stream, sp, err) + o(nctx, req, opts, stream, sp, err) } return stream, err } func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindProducer)) - } + nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer)) defer sp.Finish() - - err := ot.Client.Publish(ctx, msg, opts...) + sp.AddLabels("messaging.destination.name", msg.Topic()) + sp.AddLabels("messaging.operation", "publish") + err := ot.Client.Publish(nctx, msg, opts...) for _, o := range ot.opts.ClientPublishObservers { - o(ctx, msg, opts, sp, err) + o(nctx, msg, opts, sp, err) } return err @@ -304,40 +299,36 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i callType = "stream" } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-server", - tracer.WithSpanKind(tracer.SpanKindServer), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", callType, - ), - ) - } - + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindServer), + tracer.WithSpanLabels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", callType, + ), + ) defer sp.Finish() - err := ot.serverHandler(ctx, req, rsp) + err := ot.serverHandler(nctx, req, rsp) for _, o := range ot.opts.ServerHandlerObservers { - o(ctx, req, rsp, sp, err) + o(nctx, req, rsp, sp, err) } return err } func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindConsumer)) - } + nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer)) defer sp.Finish() - - err := ot.serverSubscriber(ctx, msg) + sp.AddLabels("messaging.operation", "process") + sp.AddLabels("messaging.source.name", msg.Topic()) + err := ot.serverSubscriber(nctx, msg) for _, o := range ot.opts.ServerSubscriberObservers { - o(ctx, msg, sp, err) + o(nctx, msg, sp, err) } return err @@ -375,23 +366,23 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client. } } - sp, ok := tracer.SpanFromContext(ctx) - if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", - tracer.WithSpanKind(tracer.SpanKindClient), - tracer.WithSpanLabels( - "rpc.flavor", "rpc", - "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), - "rpc.call_type", "unary", - ), - ) - } + nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels( + "rpc.service", req.Service(), + "rpc.method", req.Method(), + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "unary", + ), + ) + defer sp.Finish() - err := ot.clientCallFunc(ctx, addr, req, rsp, opts) + err := ot.clientCallFunc(nctx, addr, req, rsp, opts) for _, o := range ot.opts.ClientCallFuncObservers { - o(ctx, addr, req, rsp, opts, sp, err) + o(nctx, addr, req, rsp, opts, sp, err) } return err