tracer: update wrapper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		
							
								
								
									
										8
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										8
									
								
								go.mod
									
									
									
									
									
								
							| @@ -8,13 +8,13 @@ require ( | ||||
| 	github.com/imdario/mergo v0.3.15 | ||||
| 	github.com/patrickmn/go-cache v2.1.0+incompatible | ||||
| 	github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 | ||||
| 	golang.org/x/sync v0.1.0 | ||||
| 	golang.org/x/sync v0.3.0 | ||||
| 	golang.org/x/sys v0.7.0 | ||||
| 	google.golang.org/grpc v1.54.0 | ||||
| 	google.golang.org/protobuf v1.30.0 | ||||
| 	google.golang.org/grpc v1.57.0 | ||||
| 	google.golang.org/protobuf v1.31.0 | ||||
| ) | ||||
|  | ||||
| require ( | ||||
| 	github.com/golang/protobuf v1.5.3 // indirect | ||||
| 	google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect | ||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect | ||||
| ) | ||||
|   | ||||
							
								
								
									
										20
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										20
									
								
								go.sum
									
									
									
									
									
								
							| @@ -13,21 +13,21 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR | ||||
| github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= | ||||
| github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= | ||||
| github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= | ||||
| golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= | ||||
| golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= | ||||
| golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | ||||
| golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= | ||||
| golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= | ||||
| golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= | ||||
| golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= | ||||
| golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | ||||
| golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= | ||||
| golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= | ||||
| golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||||
| google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= | ||||
| google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= | ||||
| google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= | ||||
| google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= | ||||
| google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= | ||||
| google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= | ||||
| google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= | ||||
| google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= | ||||
| google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= | ||||
| google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= | ||||
| google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= | ||||
| google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= | ||||
| gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||||
| gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||||
|   | ||||
| @@ -19,6 +19,8 @@ var ( | ||||
| 	HeaderTimeout = "Micro-Timeout" | ||||
| 	// HeaderAuthorization specifies Authorization header | ||||
| 	HeaderAuthorization = "Authorization" | ||||
| 	// HeaderXRequestID specifies request id | ||||
| 	HeaderXRequestID = "X-Request-Id" | ||||
| ) | ||||
|  | ||||
| // Metadata is our way of representing request headers internally. | ||||
|   | ||||
| @@ -91,7 +91,9 @@ type SpanOptions struct { | ||||
| } | ||||
|  | ||||
| // EventOptions contains event options | ||||
| type EventOptions struct{} | ||||
| type EventOptions struct { | ||||
| 	Labels []interface{} | ||||
| } | ||||
|  | ||||
| func WithSpanLabels(ls ...interface{}) options.Option { | ||||
| 	return func(src interface{}) error { | ||||
| @@ -110,6 +112,26 @@ func WithSpanLabels(ls ...interface{}) options.Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // EventOption func signature | ||||
| type EventOption func(o *EventOptions) | ||||
|  | ||||
| func WithEventLabels(ls ...interface{}) options.Option { | ||||
| 	return func(src interface{}) error { | ||||
| 		v, err := options.Get(src, ".Labels") | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} else if rutil.IsZero(v) { | ||||
| 			v = reflect.MakeSlice(reflect.TypeOf(v), 0, len(ls)).Interface() | ||||
| 		} | ||||
| 		cv := reflect.ValueOf(v) | ||||
| 		for _, l := range ls { | ||||
| 			reflect.Append(cv, reflect.ValueOf(l)) | ||||
| 		} | ||||
| 		err = options.Set(src, cv, ".Labels") | ||||
| 		return err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func WithSpanKind(k SpanKind) options.Option { | ||||
| 	return func(src interface{}) error { | ||||
| 		return options.Set(src, k, ".Kind") | ||||
|   | ||||
| @@ -7,77 +7,70 @@ import ( | ||||
|  | ||||
| 	"go.unistack.org/micro/v4/client" | ||||
| 	"go.unistack.org/micro/v4/metadata" | ||||
| 	"go.unistack.org/micro/v4/options" | ||||
| 	"go.unistack.org/micro/v4/server" | ||||
| 	"go.unistack.org/micro/v4/tracer" | ||||
| ) | ||||
|  | ||||
| var DefaultHeadersExctract = []string{metadata.HeaderTopic, metadata.HeaderEndpoint, metadata.HeaderService, metadata.HeaderXRequestID} | ||||
|  | ||||
| func extractLabels(md metadata.Metadata) []string { | ||||
| 	labels := make([]string, 0, 5) | ||||
| 	for _, k := range DefaultHeadersExctract { | ||||
| 		if v, ok := md.Get(k); ok { | ||||
| 			labels = append(labels, k, v) | ||||
| 		} | ||||
| 	} | ||||
| 	return labels | ||||
| } | ||||
|  | ||||
| var ( | ||||
| 	DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { | ||||
| 	DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []options.Option, sp tracer.Span, err error) { | ||||
| 		sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) | ||||
| 		var labels []interface{} | ||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||
| 			labels = make([]interface{}, 0, len(md)+1) | ||||
| 			for k, v := range md { | ||||
| 				labels = append(labels, k, v) | ||||
| 			} | ||||
| 			labels = append(labels, extractLabels(md)) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			labels = append(labels, "error", err.Error()) | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		} | ||||
| 		labels = append(labels, "kind", sp.Kind()) | ||||
| 		sp.SetLabels(labels...) | ||||
| 		sp.AddLabels(labels...) | ||||
| 	} | ||||
|  | ||||
| 	DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { | ||||
| 	DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []options.Option, stream client.Stream, sp tracer.Span, err error) { | ||||
| 		sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method())) | ||||
| 		var labels []interface{} | ||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||
| 			labels = make([]interface{}, 0, len(md)) | ||||
| 			for k, v := range md { | ||||
| 				labels = append(labels, k, v) | ||||
| 			} | ||||
| 			labels = append(labels, extractLabels(md)) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			labels = append(labels, "error", err.Error()) | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		} | ||||
| 		labels = append(labels, "kind", sp.Kind()) | ||||
| 		sp.SetLabels(labels...) | ||||
| 		sp.AddLabels(labels...) | ||||
| 	} | ||||
|  | ||||
| 	DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { | ||||
| 		sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method())) | ||||
| 		var labels []interface{} | ||||
| 		if md, ok := metadata.FromIncomingContext(ctx); ok { | ||||
| 			labels = make([]interface{}, 0, len(md)) | ||||
| 			for k, v := range md { | ||||
| 				labels = append(labels, k, v) | ||||
| 			} | ||||
| 			labels = append(labels, extractLabels(md)) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			labels = append(labels, "error", err.Error()) | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		} | ||||
| 		labels = append(labels, "kind", sp.Kind()) | ||||
| 		sp.SetLabels(labels...) | ||||
| 		sp.AddLabels(labels...) | ||||
| 	} | ||||
|  | ||||
| 	DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { | ||||
| 		sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) | ||||
| 		var labels []interface{} | ||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||
| 			labels = make([]interface{}, 0, len(md)) | ||||
| 			for k, v := range md { | ||||
| 				labels = append(labels, k, v) | ||||
| 			} | ||||
| 			labels = append(labels, extractLabels(md)) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			labels = append(labels, "error", err.Error()) | ||||
| 			sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		} | ||||
| 		labels = append(labels, "kind", sp.Kind()) | ||||
| 		sp.SetLabels(labels...) | ||||
| 		sp.AddLabels(labels...) | ||||
| 	} | ||||
|  | ||||
| 	DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"} | ||||
| @@ -91,8 +84,8 @@ type tWrapper struct { | ||||
| } | ||||
|  | ||||
| type ( | ||||
| 	ClientCallObserver     func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error) | ||||
| 	ClientStreamObserver   func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error) | ||||
| 	ClientCallObserver     func(context.Context, client.Request, interface{}, []options.Option, tracer.Span, error) | ||||
| 	ClientStreamObserver   func(context.Context, client.Request, []options.Option, client.Stream, tracer.Span, error) | ||||
| 	ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error) | ||||
| 	ServerHandlerObserver  func(context.Context, server.Request, interface{}, tracer.Span, error) | ||||
| ) | ||||
| @@ -176,7 +169,7 @@ func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { | ||||
| func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...options.Option) error { | ||||
| 	endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) | ||||
| 	for _, ep := range ot.opts.SkipEndpoints { | ||||
| 		if ep == endpoint { | ||||
| @@ -186,7 +179,14 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ | ||||
|  | ||||
| 	sp, ok := tracer.SpanFromContext(ctx) | ||||
| 	if !ok { | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient)) | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", | ||||
| 			tracer.WithSpanKind(tracer.SpanKindClient), | ||||
| 			tracer.WithSpanLabels( | ||||
| 				"rpc.flavor", "rpc", | ||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||
| 				"rpc.call_type", "unary", | ||||
| 			), | ||||
| 		) | ||||
| 	} | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| @@ -199,7 +199,7 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { | ||||
| func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...options.Option) (client.Stream, error) { | ||||
| 	endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) | ||||
| 	for _, ep := range ot.opts.SkipEndpoints { | ||||
| 		if ep == endpoint { | ||||
| @@ -209,7 +209,14 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie | ||||
|  | ||||
| 	sp, ok := tracer.SpanFromContext(ctx) | ||||
| 	if !ok { | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient)) | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", | ||||
| 			tracer.WithSpanKind(tracer.SpanKindClient), | ||||
| 			tracer.WithSpanLabels( | ||||
| 				"rpc.flavor", "rpc", | ||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||
| 				"rpc.call_type", "stream", | ||||
| 			), | ||||
| 		) | ||||
| 	} | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| @@ -230,10 +237,23 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	callType := "unary" | ||||
| 	if req.Stream() { | ||||
| 		callType = "stream" | ||||
| 	} | ||||
|  | ||||
| 	sp, ok := tracer.SpanFromContext(ctx) | ||||
| 	if !ok { | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindServer)) | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-server", | ||||
| 			tracer.WithSpanKind(tracer.SpanKindServer), | ||||
| 			tracer.WithSpanLabels( | ||||
| 				"rpc.flavor", "rpc", | ||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||
| 				"rpc.call_type", callType, | ||||
| 			), | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| 	err := ot.serverHandler(ctx, req, rsp) | ||||
| @@ -279,7 +299,14 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client. | ||||
|  | ||||
| 	sp, ok := tracer.SpanFromContext(ctx) | ||||
| 	if !ok { | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient)) | ||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", | ||||
| 			tracer.WithSpanKind(tracer.SpanKindClient), | ||||
| 			tracer.WithSpanLabels( | ||||
| 				"rpc.flavor", "rpc", | ||||
| 				"rpc.call", "/"+req.Service()+"/"+req.Endpoint(), | ||||
| 				"rpc.call_type", "unary", | ||||
| 			), | ||||
| 		) | ||||
| 	} | ||||
| 	defer sp.Finish() | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user