tracer: tweaks for span tags and naming #239
| @@ -3,6 +3,8 @@ package tracer // import "go.unistack.org/micro/v3/tracer" | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 	"sort" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // DefaultTracer is the global default tracer | // DefaultTracer is the global default tracer | ||||||
| @@ -42,3 +44,37 @@ type Span interface { | |||||||
| 	// Kind returns span kind | 	// Kind returns span kind | ||||||
| 	Kind() SpanKind | 	Kind() SpanKind | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // sort labels alphabeticaly by label name | ||||||
|  | type byKey []interface{} | ||||||
|  |  | ||||||
|  | func (k byKey) Len() int           { return len(k) / 2 } | ||||||
|  | func (k byKey) Less(i, j int) bool { return fmt.Sprintf("%s", k[i*2]) < fmt.Sprintf("%s", k[j*2]) } | ||||||
|  | func (k byKey) Swap(i, j int) { | ||||||
|  | 	k[i*2], k[j*2] = k[j*2], k[i*2] | ||||||
|  | 	k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1] | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func UniqLabels(labels []interface{}) []interface{} { | ||||||
|  | 	if len(labels)%2 == 1 { | ||||||
|  | 		labels = labels[:len(labels)-1] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(labels) > 2 { | ||||||
|  | 		sort.Sort(byKey(labels)) | ||||||
|  |  | ||||||
|  | 		idx := 0 | ||||||
|  | 		for { | ||||||
|  | 			if labels[idx] == labels[idx+2] { | ||||||
|  | 				copy(labels[idx:], labels[idx+2:]) | ||||||
|  | 				labels = labels[:len(labels)-2] | ||||||
|  | 			} else { | ||||||
|  | 				idx += 2 | ||||||
|  | 			} | ||||||
|  | 			if idx+2 >= len(labels) { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return labels | ||||||
|  | } | ||||||
|   | |||||||
							
								
								
									
										13
									
								
								tracer/tracer_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								tracer/tracer_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,13 @@ | |||||||
|  | package tracer | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"testing" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestUniqLabels(t *testing.T) { | ||||||
|  | 	labels := []interface{}{"key1", "val1", "key1", "val2"} | ||||||
|  | 	labels = UniqLabels(labels) | ||||||
|  | 	if labels[1] != "val2" { | ||||||
|  | 		t.Fatalf("UniqLabels not works") | ||||||
|  | 	} | ||||||
|  | } | ||||||
| @@ -4,6 +4,7 @@ package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper" | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"strings" | ||||||
|  |  | ||||||
| 	"go.unistack.org/micro/v3/client" | 	"go.unistack.org/micro/v3/client" | ||||||
| 	"go.unistack.org/micro/v3/metadata" | 	"go.unistack.org/micro/v3/metadata" | ||||||
| @@ -11,13 +12,13 @@ import ( | |||||||
| 	"go.unistack.org/micro/v3/tracer" | 	"go.unistack.org/micro/v3/tracer" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var DefaultHeadersExctract = []string{metadata.HeaderTopic, metadata.HeaderEndpoint, metadata.HeaderService, metadata.HeaderXRequestID} | var DefaultHeadersExctract = []string{metadata.HeaderXRequestID} | ||||||
|  |  | ||||||
| func extractLabels(md metadata.Metadata) []string { | func ExtractDefaultLabels(md metadata.Metadata) []interface{} { | ||||||
| 	labels := make([]string, 0, 5) | 	labels := make([]interface{}, 0, len(DefaultHeadersExctract)) | ||||||
| 	for _, k := range DefaultHeadersExctract { | 	for _, k := range DefaultHeadersExctract { | ||||||
| 		if v, ok := md.Get(k); ok { | 		if v, ok := md.Get(k); ok { | ||||||
| 			labels = append(labels, k, v) | 			labels = append(labels, strings.ToLower(k), v) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return labels | 	return labels | ||||||
| @@ -25,10 +26,9 @@ func extractLabels(md metadata.Metadata) []string { | |||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { | 	DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) |  | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = append(labels, extractLabels(md)) | 			labels = append(labels, ExtractDefaultLabels(md)...) | ||||||
| 		} | 		} | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
| @@ -37,10 +37,9 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { | 	DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method())) |  | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = append(labels, extractLabels(md)) | 			labels = append(labels, ExtractDefaultLabels(md)...) | ||||||
| 		} | 		} | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
| @@ -49,11 +48,11 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { | 	DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Publish %s", msg.Topic())) |  | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = append(labels, extractLabels(md)) | 			labels = append(labels, ExtractDefaultLabels(md)...) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, ExtractDefaultLabels(msg.Metadata())...) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
| 		} | 		} | ||||||
| @@ -61,10 +60,9 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { | 	DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method())) |  | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromIncomingContext(ctx); ok { | 		if md, ok := metadata.FromIncomingContext(ctx); ok { | ||||||
| 			labels = append(labels, extractLabels(md)) | 			labels = append(labels, ExtractDefaultLabels(md)...) | ||||||
| 		} | 		} | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
| @@ -73,11 +71,12 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { | 	DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Subscriber %s", msg.Topic())) |  | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromIncomingContext(ctx); ok { | 		if md, ok := metadata.FromIncomingContext(ctx); ok { | ||||||
| 			labels = append(labels, extractLabels(md)) | 			labels = append(labels, ExtractDefaultLabels(md)...) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, ExtractDefaultLabels(msg.Header())...) | ||||||
|  |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
| 		} | 		} | ||||||
| @@ -85,10 +84,10 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { | 	DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) | 		sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method())) | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = append(labels, extractLabels(md)) | 			labels = append(labels, ExtractDefaultLabels(md)...) | ||||||
| 		} | 		} | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
| @@ -223,23 +222,22 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), | ||||||
| 	if !ok { | 		tracer.WithSpanKind(tracer.SpanKindClient), | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", | 		tracer.WithSpanLabels( | ||||||
| 			tracer.WithSpanKind(tracer.SpanKindClient), | 			"rpc.service", req.Service(), | ||||||
| 			tracer.WithSpanLabels( | 			"rpc.method", req.Method(), | ||||||
| 				"rpc.flavor", "rpc", | 			"rpc.flavor", "rpc", | ||||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | 			"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||||
| 				"rpc.call_type", "unary", | 			"rpc.call_type", "unary", | ||||||
| 			), | 		), | ||||||
| 		) | 	) | ||||||
| 	} |  | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| 	err := ot.Client.Call(ctx, req, rsp, opts...) | 	err := ot.Client.Call(nctx, req, rsp, opts...) | ||||||
|  |  | ||||||
| 	for _, o := range ot.opts.ClientCallObservers { | 	for _, o := range ot.opts.ClientCallObservers { | ||||||
| 		o(ctx, req, rsp, opts, sp, err) | 		o(nctx, req, rsp, opts, sp, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
| @@ -253,39 +251,36 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), | ||||||
| 	if !ok { | 		tracer.WithSpanKind(tracer.SpanKindClient), | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", | 		tracer.WithSpanLabels( | ||||||
| 			tracer.WithSpanKind(tracer.SpanKindClient), | 			"rpc.service", req.Service(), | ||||||
| 			tracer.WithSpanLabels( | 			"rpc.method", req.Method(), | ||||||
| 				"rpc.flavor", "rpc", | 			"rpc.flavor", "rpc", | ||||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | 			"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||||
| 				"rpc.call_type", "stream", | 			"rpc.call_type", "stream", | ||||||
| 			), | 		), | ||||||
| 		) | 	) | ||||||
| 	} |  | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| 	stream, err := ot.Client.Stream(ctx, req, opts...) | 	stream, err := ot.Client.Stream(nctx, req, opts...) | ||||||
|  |  | ||||||
| 	for _, o := range ot.opts.ClientStreamObservers { | 	for _, o := range ot.opts.ClientStreamObservers { | ||||||
| 		o(ctx, req, opts, stream, sp, err) | 		o(nctx, req, opts, stream, sp, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return stream, err | 	return stream, err | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { | func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer)) | ||||||
| 	if !ok { |  | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindProducer)) |  | ||||||
| 	} |  | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  | 	sp.AddLabels("messaging.destination.name", msg.Topic()) | ||||||
| 	err := ot.Client.Publish(ctx, msg, opts...) | 	sp.AddLabels("messaging.operation", "publish") | ||||||
|  | 	err := ot.Client.Publish(nctx, msg, opts...) | ||||||
|  |  | ||||||
| 	for _, o := range ot.opts.ClientPublishObservers { | 	for _, o := range ot.opts.ClientPublishObservers { | ||||||
| 		o(ctx, msg, opts, sp, err) | 		o(nctx, msg, opts, sp, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
| @@ -304,40 +299,36 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i | |||||||
| 		callType = "stream" | 		callType = "stream" | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()), | ||||||
| 	if !ok { | 		tracer.WithSpanKind(tracer.SpanKindServer), | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-server", | 		tracer.WithSpanLabels( | ||||||
| 			tracer.WithSpanKind(tracer.SpanKindServer), | 			"rpc.service", req.Service(), | ||||||
| 			tracer.WithSpanLabels( | 			"rpc.method", req.Method(), | ||||||
| 				"rpc.flavor", "rpc", | 			"rpc.flavor", "rpc", | ||||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | 			"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||||
| 				"rpc.call_type", callType, | 			"rpc.call_type", callType, | ||||||
| 			), | 		), | ||||||
| 		) | 	) | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| 	err := ot.serverHandler(ctx, req, rsp) | 	err := ot.serverHandler(nctx, req, rsp) | ||||||
|  |  | ||||||
| 	for _, o := range ot.opts.ServerHandlerObservers { | 	for _, o := range ot.opts.ServerHandlerObservers { | ||||||
| 		o(ctx, req, rsp, sp, err) | 		o(nctx, req, rsp, sp, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { | func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer)) | ||||||
| 	if !ok { |  | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindConsumer)) |  | ||||||
| 	} |  | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  | 	sp.AddLabels("messaging.operation", "process") | ||||||
| 	err := ot.serverSubscriber(ctx, msg) | 	sp.AddLabels("messaging.source.name", msg.Topic()) | ||||||
|  | 	err := ot.serverSubscriber(nctx, msg) | ||||||
|  |  | ||||||
| 	for _, o := range ot.opts.ServerSubscriberObservers { | 	for _, o := range ot.opts.ServerSubscriberObservers { | ||||||
| 		o(ctx, msg, sp, err) | 		o(nctx, msg, sp, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
| @@ -375,23 +366,23 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client. | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()), | ||||||
| 	if !ok { | 		tracer.WithSpanKind(tracer.SpanKindClient), | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", | 		tracer.WithSpanLabels( | ||||||
| 			tracer.WithSpanKind(tracer.SpanKindClient), | 			"rpc.service", req.Service(), | ||||||
| 			tracer.WithSpanLabels( | 			"rpc.method", req.Method(), | ||||||
| 				"rpc.flavor", "rpc", | 			"rpc.flavor", "rpc", | ||||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | 			"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||||
| 				"rpc.call_type", "unary", | 			"rpc.call_type", "unary", | ||||||
| 			), | 		), | ||||||
| 		) | 	) | ||||||
| 	} |  | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| 	err := ot.clientCallFunc(ctx, addr, req, rsp, opts) | 	err := ot.clientCallFunc(nctx, addr, req, rsp, opts) | ||||||
|  |  | ||||||
| 	for _, o := range ot.opts.ClientCallFuncObservers { | 	for _, o := range ot.opts.ClientCallFuncObservers { | ||||||
| 		o(ctx, addr, req, rsp, opts, sp, err) | 		o(nctx, addr, req, rsp, opts, sp, err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user