tracer: improve
This commit is contained in:
		
							
								
								
									
										5
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										5
									
								
								go.mod
									
									
									
									
									
								
							| @@ -9,12 +9,13 @@ require ( | |||||||
| 	github.com/patrickmn/go-cache v2.1.0+incompatible | 	github.com/patrickmn/go-cache v2.1.0+incompatible | ||||||
| 	github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 | 	github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 | ||||||
| 	golang.org/x/sync v0.3.0 | 	golang.org/x/sync v0.3.0 | ||||||
| 	golang.org/x/sys v0.7.0 | 	golang.org/x/sys v0.11.0 | ||||||
| 	google.golang.org/grpc v1.57.0 | 	google.golang.org/grpc v1.57.0 | ||||||
| 	google.golang.org/protobuf v1.31.0 | 	google.golang.org/protobuf v1.31.0 | ||||||
| ) | ) | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	github.com/golang/protobuf v1.5.3 // indirect | 	github.com/golang/protobuf v1.5.3 // indirect | ||||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect | 	golang.org/x/net v0.14.0 // indirect | ||||||
|  | 	google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect | ||||||
| ) | ) | ||||||
|   | |||||||
							
								
								
									
										13
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								go.sum
									
									
									
									
									
								
							| @@ -13,15 +13,16 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR | |||||||
| github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= | github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= | ||||||
| github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= | github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= | ||||||
| github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= | github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= | ||||||
| golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= | golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= | ||||||
|  | golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= | ||||||
| golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= | golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= | ||||||
| golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= | golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= | ||||||
| golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= | golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= | ||||||
| golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | ||||||
| golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= | golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= | ||||||
| golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= | google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= | ||||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= | google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= | ||||||
| google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= | google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= | ||||||
| google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= | google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= | ||||||
| google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= | google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= | ||||||
|   | |||||||
| @@ -10,18 +10,26 @@ var _ Tracer = (*noopTracer)(nil) | |||||||
|  |  | ||||||
| type noopTracer struct { | type noopTracer struct { | ||||||
| 	opts  Options | 	opts  Options | ||||||
|  | 	spans []Span | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *noopTracer) Spans() []Span { | ||||||
|  | 	return t.spans | ||||||
| } | } | ||||||
|  |  | ||||||
| func (t *noopTracer) Start(ctx context.Context, name string, opts ...options.Option) (context.Context, Span) { | func (t *noopTracer) Start(ctx context.Context, name string, opts ...options.Option) (context.Context, Span) { | ||||||
|  | 	options := NewSpanOptions(opts...) | ||||||
| 	span := &noopSpan{ | 	span := &noopSpan{ | ||||||
| 		name:   name, | 		name:   name, | ||||||
| 		ctx:    ctx, | 		ctx:    ctx, | ||||||
| 		tracer: t, | 		tracer: t, | ||||||
| 		opts:   NewSpanOptions(opts...), | 		labels: options.Labels, | ||||||
|  | 		kind:   options.Kind, | ||||||
| 	} | 	} | ||||||
| 	if span.ctx == nil { | 	if span.ctx == nil { | ||||||
| 		span.ctx = context.Background() | 		span.ctx = context.Background() | ||||||
| 	} | 	} | ||||||
|  | 	t.spans = append(t.spans, span) | ||||||
| 	return NewSpanContext(ctx, span), span | 	return NewSpanContext(ctx, span), span | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -40,13 +48,21 @@ func (t *noopTracer) Name() string { | |||||||
| 	return t.opts.Name | 	return t.opts.Name | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type noopEvent struct { | ||||||
|  | 	name   string | ||||||
|  | 	labels []interface{} | ||||||
|  | } | ||||||
|  |  | ||||||
| type noopSpan struct { | type noopSpan struct { | ||||||
| 	ctx       context.Context | 	ctx       context.Context | ||||||
| 	tracer    Tracer | 	tracer    Tracer | ||||||
| 	name      string | 	name      string | ||||||
| 	opts      SpanOptions |  | ||||||
| 	status    SpanStatus |  | ||||||
| 	statusMsg string | 	statusMsg string | ||||||
|  | 	events    []*noopEvent | ||||||
|  | 	labels    []interface{} | ||||||
|  | 	logs      []interface{} | ||||||
|  | 	kind      SpanKind | ||||||
|  | 	status    SpanStatus | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *noopSpan) Finish(opts ...options.Option) { | func (s *noopSpan) Finish(opts ...options.Option) { | ||||||
| @@ -61,22 +77,24 @@ func (s *noopSpan) Tracer() Tracer { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *noopSpan) AddEvent(name string, opts ...options.Option) { | func (s *noopSpan) AddEvent(name string, opts ...options.Option) { | ||||||
|  | 	options := NewEventOptions(opts...) | ||||||
|  | 	s.events = append(s.events, &noopEvent{name: name, labels: options.Labels}) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *noopSpan) SetName(name string) { | func (s *noopSpan) SetName(name string) { | ||||||
| 	s.name = name | 	s.name = name | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *noopSpan) SetLabels(labels ...interface{}) { | func (s *noopSpan) AddLogs(kv ...interface{}) { | ||||||
| 	s.opts.Labels = labels | 	s.logs = append(s.logs, kv...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *noopSpan) AddLabels(labels ...interface{}) { | func (s *noopSpan) AddLabels(kv ...interface{}) { | ||||||
| 	s.opts.Labels = append(s.opts.Labels, labels...) | 	s.labels = append(s.labels, kv...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *noopSpan) Kind() SpanKind { | func (s *noopSpan) Kind() SpanKind { | ||||||
| 	return s.opts.Kind | 	return s.kind | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *noopSpan) Status() (SpanStatus, string) { | func (s *noopSpan) Status() (SpanStatus, string) { | ||||||
|   | |||||||
| @@ -93,14 +93,6 @@ type EventOptions struct { | |||||||
| 	Labels []interface{} | 	Labels []interface{} | ||||||
| } | } | ||||||
|  |  | ||||||
| func WithEventLabels(labels ...interface{}) options.Option { |  | ||||||
| 	return options.Labels(labels...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func WithSpanLabels(labels ...interface{}) options.Option { |  | ||||||
| 	return options.Labels(labels...) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func WithSpanKind(k SpanKind) options.Option { | func WithSpanKind(k SpanKind) options.Option { | ||||||
| 	return func(src interface{}) error { | 	return func(src interface{}) error { | ||||||
| 		return options.Set(src, k, ".Kind") | 		return options.Set(src, k, ".Kind") | ||||||
| @@ -128,6 +120,15 @@ func NewSpanOptions(opts ...options.Option) SpanOptions { | |||||||
| 	return options | 	return options | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // NewEventOptions returns default EventOptions | ||||||
|  | func NewEventOptions(opts ...options.Option) EventOptions { | ||||||
|  | 	options := EventOptions{} | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  | 	return options | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewOptions returns default options | // NewOptions returns default options | ||||||
| func NewOptions(opts ...options.Option) Options { | func NewOptions(opts ...options.Option) Options { | ||||||
| 	options := Options{ | 	options := Options{ | ||||||
|   | |||||||
| @@ -29,8 +29,6 @@ type Span interface { | |||||||
| 	Tracer() Tracer | 	Tracer() Tracer | ||||||
| 	// Finish complete and send span | 	// Finish complete and send span | ||||||
| 	Finish(opts ...options.Option) | 	Finish(opts ...options.Option) | ||||||
| 	// AddEvent add event to span |  | ||||||
| 	AddEvent(name string, opts ...options.Option) |  | ||||||
| 	// Context return context with span | 	// Context return context with span | ||||||
| 	Context() context.Context | 	Context() context.Context | ||||||
| 	// SetName set the span name | 	// SetName set the span name | ||||||
| @@ -39,10 +37,12 @@ type Span interface { | |||||||
| 	SetStatus(status SpanStatus, msg string) | 	SetStatus(status SpanStatus, msg string) | ||||||
| 	// Status returns span status and msg | 	// Status returns span status and msg | ||||||
| 	Status() (SpanStatus, string) | 	Status() (SpanStatus, string) | ||||||
| 	// SetLabels set the span labels | 	// AddLabels append labels to span | ||||||
| 	SetLabels(labels ...interface{}) | 	AddLabels(kv ...interface{}) | ||||||
| 	// AddLabels append the span labels | 	// AddEvent append event to span | ||||||
| 	AddLabels(labels ...interface{}) | 	AddEvent(name string, opts ...options.Option) | ||||||
|  | 	// AddLogs append logs to span | ||||||
|  | 	AddLogs(kv ...interface{}) | ||||||
| 	// Kind returns span kind | 	// Kind returns span kind | ||||||
| 	Kind() SpanKind | 	Kind() SpanKind | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										256
									
								
								util/grpc/tracer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										256
									
								
								util/grpc/tracer.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,256 @@ | |||||||
|  | package grpc_util | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"net" | ||||||
|  | 	"strconv" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync/atomic" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"go.unistack.org/micro/v4/options" | ||||||
|  | 	"go.unistack.org/micro/v4/tracer" | ||||||
|  | 	grpc_codes "google.golang.org/grpc/codes" | ||||||
|  | 	"google.golang.org/grpc/peer" | ||||||
|  | 	"google.golang.org/grpc/stats" | ||||||
|  | 	"google.golang.org/grpc/status" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type gRPCContextKey struct{} | ||||||
|  |  | ||||||
|  | type gRPCContext struct { | ||||||
|  | 	messagesReceived int64 | ||||||
|  | 	messagesSent     int64 | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Options struct { | ||||||
|  | 	Tracer tracer.Tracer | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewServerHandler creates a stats.Handler for gRPC server. | ||||||
|  | func NewServerHandler(tr tracer.Tracer) stats.Handler { | ||||||
|  | 	h := &serverHandler{ | ||||||
|  | 		tr: tr, | ||||||
|  | 	} | ||||||
|  | 	return h | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type serverHandler struct { | ||||||
|  | 	tr tracer.Tracer | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // TagRPC can attach some information to the given context. | ||||||
|  | func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { | ||||||
|  | 	name, attrs := parseFullMethod(info.FullMethodName) | ||||||
|  | 	attrs = append(attrs, "rpc.system", "grpc") | ||||||
|  | 	ctx, _ = h.tr.Start( | ||||||
|  | 		ctx, | ||||||
|  | 		name, | ||||||
|  | 		tracer.WithSpanKind(tracer.SpanKindServer), | ||||||
|  | 		options.Labels(attrs...), | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	gctx := gRPCContext{} | ||||||
|  | 	return context.WithValue(ctx, gRPCContextKey{}, &gctx) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // HandleRPC processes the RPC stats. | ||||||
|  | func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||||||
|  | 	handleRPC(ctx, rs) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // TagConn can attach some information to the given context. | ||||||
|  | func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { | ||||||
|  | 	if span, ok := tracer.SpanFromContext(ctx); ok { | ||||||
|  | 		attrs := peerAttr(peerFromCtx(ctx)) | ||||||
|  | 		span.AddLabels(attrs...) | ||||||
|  | 	} | ||||||
|  | 	return ctx | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // HandleConn processes the Conn stats. | ||||||
|  | func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type clientHandler struct { | ||||||
|  | 	tr tracer.Tracer | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewClientHandler creates a stats.Handler for gRPC client. | ||||||
|  | func NewClientHandler(tr tracer.Tracer) stats.Handler { | ||||||
|  | 	h := &clientHandler{ | ||||||
|  | 		tr: tr, | ||||||
|  | 	} | ||||||
|  | 	return h | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // TagRPC can attach some information to the given context. | ||||||
|  | func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { | ||||||
|  | 	name, attrs := parseFullMethod(info.FullMethodName) | ||||||
|  | 	attrs = append(attrs, "rpc.system", "grpc", "rpc.flavor", "grpc", "rpc.call", info.FullMethodName) | ||||||
|  | 	ctx, _ = h.tr.Start( | ||||||
|  | 		ctx, | ||||||
|  | 		name, | ||||||
|  | 		tracer.WithSpanKind(tracer.SpanKindClient), | ||||||
|  | 		options.Labels(attrs...), | ||||||
|  | 	) | ||||||
|  |  | ||||||
|  | 	gctx := gRPCContext{} | ||||||
|  |  | ||||||
|  | 	return context.WithValue(ctx, gRPCContextKey{}, &gctx) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // HandleRPC processes the RPC stats. | ||||||
|  | func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { | ||||||
|  | 	handleRPC(ctx, rs) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // TagConn can attach some information to the given context. | ||||||
|  | func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { | ||||||
|  | 	// TODO | ||||||
|  | 	if span, ok := tracer.SpanFromContext(ctx); ok { | ||||||
|  | 		attrs := peerAttr(cti.RemoteAddr.String()) | ||||||
|  | 		span.AddLabels(attrs...) | ||||||
|  | 	} | ||||||
|  | 	return ctx | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // HandleConn processes the Conn stats. | ||||||
|  | func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { | ||||||
|  | 	// no-op | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func handleRPC(ctx context.Context, rs stats.RPCStats) { | ||||||
|  | 	span, ok := tracer.SpanFromContext(ctx) | ||||||
|  | 	gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) | ||||||
|  | 	var messageID int64 | ||||||
|  | 	if rs.IsClient() { | ||||||
|  | 		span.AddLabels("span.kind", "client") | ||||||
|  | 	} else { | ||||||
|  | 		span.AddLabels("span.kind", "server") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	switch rs := rs.(type) { | ||||||
|  | 	case *stats.Begin: | ||||||
|  | 		if rs.IsClientStream || rs.IsServerStream { | ||||||
|  | 			span.AddLabels("rpc.call_type", "stream") | ||||||
|  | 		} else { | ||||||
|  | 			span.AddLabels("rpc.call_type", "unary") | ||||||
|  | 		} | ||||||
|  | 		span.AddEvent("message", | ||||||
|  | 			options.Labels( | ||||||
|  | 				"message.begin_time", rs.BeginTime.Format(time.RFC3339), | ||||||
|  | 			), | ||||||
|  | 		) | ||||||
|  | 	case *stats.InPayload: | ||||||
|  | 		if gctx != nil { | ||||||
|  | 			messageID = atomic.AddInt64(&gctx.messagesReceived, 1) | ||||||
|  | 		} | ||||||
|  | 		if ok { | ||||||
|  | 			span.AddEvent("message", | ||||||
|  | 				options.Labels( | ||||||
|  | 					"message.recv_time", rs.RecvTime.Format(time.RFC3339), | ||||||
|  | 					"message.type", "RECEIVED", | ||||||
|  | 					"message.id", messageID, | ||||||
|  | 					"message.compressed_size", rs.CompressedLength, | ||||||
|  | 					"message.uncompressed_size", rs.Length, | ||||||
|  | 				), | ||||||
|  | 			) | ||||||
|  | 		} | ||||||
|  | 	case *stats.OutPayload: | ||||||
|  | 		if gctx != nil { | ||||||
|  | 			messageID = atomic.AddInt64(&gctx.messagesSent, 1) | ||||||
|  | 		} | ||||||
|  | 		if ok { | ||||||
|  | 			span.AddEvent("message", | ||||||
|  | 				options.Labels( | ||||||
|  | 					"message.sent_time", rs.SentTime.Format(time.RFC3339), | ||||||
|  | 					"message.type", "SENT", | ||||||
|  | 					"message.id", messageID, | ||||||
|  | 					"message.compressed_size", rs.CompressedLength, | ||||||
|  | 					"message.uncompressed_size", rs.Length, | ||||||
|  | 				), | ||||||
|  | 			) | ||||||
|  | 		} | ||||||
|  | 	case *stats.End: | ||||||
|  | 		if ok { | ||||||
|  | 			span.AddEvent("message", | ||||||
|  | 				options.Labels( | ||||||
|  | 					"message.begin_time", rs.BeginTime.Format(time.RFC3339), | ||||||
|  | 					"message.end_time", rs.EndTime.Format(time.RFC3339), | ||||||
|  | 				), | ||||||
|  | 			) | ||||||
|  | 			if rs.Error != nil { | ||||||
|  | 				s, _ := status.FromError(rs.Error) | ||||||
|  | 				span.SetStatus(tracer.SpanStatusError, s.Message()) | ||||||
|  | 				span.AddLabels("rpc.grpc.status_code", s.Code()) | ||||||
|  | 			} else { | ||||||
|  | 				span.AddLabels("rpc.grpc.status_code", grpc_codes.OK) | ||||||
|  | 			} | ||||||
|  | 			span.Finish() | ||||||
|  | 		} | ||||||
|  | 	default: | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func parseFullMethod(fullMethod string) (string, []interface{}) { | ||||||
|  | 	if !strings.HasPrefix(fullMethod, "/") { | ||||||
|  | 		// Invalid format, does not follow `/package.service/method`. | ||||||
|  | 		return fullMethod, nil | ||||||
|  | 	} | ||||||
|  | 	name := fullMethod[1:] | ||||||
|  | 	pos := strings.LastIndex(name, "/") | ||||||
|  | 	if pos < 0 { | ||||||
|  | 		// Invalid format, does not follow `/package.service/method`. | ||||||
|  | 		return name, nil | ||||||
|  | 	} | ||||||
|  | 	service, method := name[:pos], name[pos+1:] | ||||||
|  |  | ||||||
|  | 	var attrs []interface{} | ||||||
|  | 	if service != "" { | ||||||
|  | 		attrs = append(attrs, "rpc.service", service) | ||||||
|  | 	} | ||||||
|  | 	if method != "" { | ||||||
|  | 		attrs = append(attrs, "rpc.method", method) | ||||||
|  | 	} | ||||||
|  | 	return name, attrs | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func peerAttr(addr string) []interface{} { | ||||||
|  | 	host, p, err := net.SplitHostPort(addr) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if host == "" { | ||||||
|  | 		host = "127.0.0.1" | ||||||
|  | 	} | ||||||
|  | 	port, err := strconv.Atoi(p) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var attr []interface{} | ||||||
|  | 	if ip := net.ParseIP(host); ip != nil { | ||||||
|  | 		attr = []interface{}{ | ||||||
|  | 			"net.sock.peer.addr", host, | ||||||
|  | 			"net.sock.peer.port", port, | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		attr = []interface{}{ | ||||||
|  | 			"net.peer.name", host, | ||||||
|  | 			"net.peer.port", port, | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return attr | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func peerFromCtx(ctx context.Context) string { | ||||||
|  | 	p, ok := peer.FromContext(ctx) | ||||||
|  | 	if !ok { | ||||||
|  | 		return "" | ||||||
|  | 	} | ||||||
|  | 	return p.Addr.String() | ||||||
|  | } | ||||||
							
								
								
									
										254
									
								
								util/http/clienttracer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										254
									
								
								util/http/clienttracer.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,254 @@ | |||||||
|  | // | ||||||
|  | // Copyright The OpenTelemetry Authors | ||||||
|  | // | ||||||
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | // you may not use this file except in compliance with the License. | ||||||
|  | // You may obtain a copy of the License at | ||||||
|  | // | ||||||
|  | //     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  | // | ||||||
|  | // Unless required by applicable law or agreed to in writing, software | ||||||
|  | // distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | // See the License for the specific language governing permissions and | ||||||
|  | // limitations under the License. | ||||||
|  |  | ||||||
|  | package http | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"crypto/tls" | ||||||
|  | 	"net/http/httptrace" | ||||||
|  | 	"net/textproto" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
|  | 	"go.unistack.org/micro/v4/options" | ||||||
|  | 	"go.unistack.org/micro/v4/tracer" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	httpStatus     = "http.status" | ||||||
|  | 	httpHeaderMIME = "http.mime" | ||||||
|  | 	httpRemoteAddr = "http.remote" | ||||||
|  | 	httpLocalAddr  = "http.local" | ||||||
|  | 	httpHost       = "http.host" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var hookMap = map[string]string{ | ||||||
|  | 	"http.dns":     "http.getconn", | ||||||
|  | 	"http.connect": "http.getconn", | ||||||
|  | 	"http.tls":     "http.getconn", | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func parentHook(hook string) string { | ||||||
|  | 	if strings.HasPrefix(hook, "http.connect") { | ||||||
|  | 		return hookMap["http.connect"] | ||||||
|  | 	} | ||||||
|  | 	return hookMap[hook] | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type clientTracer struct { | ||||||
|  | 	context.Context | ||||||
|  | 	tr          tracer.Tracer | ||||||
|  | 	activeHooks map[string]context.Context | ||||||
|  | 	root        tracer.Span | ||||||
|  | 	mtx         sync.Mutex | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewClientTrace(ctx context.Context, tr tracer.Tracer) *httptrace.ClientTrace { | ||||||
|  | 	ct := &clientTracer{ | ||||||
|  | 		Context:     ctx, | ||||||
|  | 		activeHooks: make(map[string]context.Context), | ||||||
|  | 		tr:          tr, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &httptrace.ClientTrace{ | ||||||
|  | 		GetConn:              ct.getConn, | ||||||
|  | 		GotConn:              ct.gotConn, | ||||||
|  | 		PutIdleConn:          ct.putIdleConn, | ||||||
|  | 		GotFirstResponseByte: ct.gotFirstResponseByte, | ||||||
|  | 		Got100Continue:       ct.got100Continue, | ||||||
|  | 		Got1xxResponse:       ct.got1xxResponse, | ||||||
|  | 		DNSStart:             ct.dnsStart, | ||||||
|  | 		DNSDone:              ct.dnsDone, | ||||||
|  | 		ConnectStart:         ct.connectStart, | ||||||
|  | 		ConnectDone:          ct.connectDone, | ||||||
|  | 		TLSHandshakeStart:    ct.tlsHandshakeStart, | ||||||
|  | 		TLSHandshakeDone:     ct.tlsHandshakeDone, | ||||||
|  | 		WroteHeaderField:     ct.wroteHeaderField, | ||||||
|  | 		WroteHeaders:         ct.wroteHeaders, | ||||||
|  | 		Wait100Continue:      ct.wait100Continue, | ||||||
|  | 		WroteRequest:         ct.wroteRequest, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) start(hook, spanName string, attrs ...interface{}) { | ||||||
|  | 	ct.mtx.Lock() | ||||||
|  | 	defer ct.mtx.Unlock() | ||||||
|  |  | ||||||
|  | 	if hookCtx, found := ct.activeHooks[hook]; !found { | ||||||
|  | 		var sp tracer.Span | ||||||
|  | 		ct.activeHooks[hook], sp = ct.tr.Start(ct.getParentContext(hook), spanName, options.Labels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient)) | ||||||
|  | 		if ct.root == nil { | ||||||
|  | 			ct.root = sp | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		// end was called before start finished, add the start attributes and end the span here | ||||||
|  | 		if span, ok := tracer.SpanFromContext(hookCtx); ok { | ||||||
|  | 			span.AddLabels(attrs...) | ||||||
|  | 			span.Finish() | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		delete(ct.activeHooks, hook) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) end(hook string, err error, attrs ...interface{}) { | ||||||
|  | 	ct.mtx.Lock() | ||||||
|  | 	defer ct.mtx.Unlock() | ||||||
|  | 	if ctx, ok := ct.activeHooks[hook]; ok { // nolint:nestif | ||||||
|  | 		if span, ok := tracer.SpanFromContext(ctx); ok { | ||||||
|  | 			if err != nil { | ||||||
|  | 				span.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
|  | 			} | ||||||
|  | 			span.AddLabels(attrs...) | ||||||
|  | 			span.Finish() | ||||||
|  | 		} | ||||||
|  | 		delete(ct.activeHooks, hook) | ||||||
|  | 	} else { | ||||||
|  | 		// start is not finished before end is called. | ||||||
|  | 		// Start a span here with the ending attributes that will be finished when start finishes. | ||||||
|  | 		// Yes, it's backwards. v0v | ||||||
|  | 		ctx, span := ct.tr.Start(ct.getParentContext(hook), hook, options.Labels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient)) | ||||||
|  | 		if err != nil { | ||||||
|  | 			span.SetStatus(tracer.SpanStatusError, err.Error()) | ||||||
|  | 		} | ||||||
|  | 		ct.activeHooks[hook] = ctx | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) getParentContext(hook string) context.Context { | ||||||
|  | 	ctx, ok := ct.activeHooks[parentHook(hook)] | ||||||
|  | 	if !ok { | ||||||
|  | 		return ct.Context | ||||||
|  | 	} | ||||||
|  | 	return ctx | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) span(hook string) (tracer.Span, bool) { | ||||||
|  | 	ct.mtx.Lock() | ||||||
|  | 	defer ct.mtx.Unlock() | ||||||
|  | 	if ctx, ok := ct.activeHooks[hook]; ok { | ||||||
|  | 		return tracer.SpanFromContext(ctx) | ||||||
|  | 	} | ||||||
|  | 	return nil, false | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) getConn(host string) { | ||||||
|  | 	ct.start("http.getconn", "http.getconn", httpHost, host) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) { | ||||||
|  | 	ct.end("http.getconn", | ||||||
|  | 		nil, | ||||||
|  | 		httpRemoteAddr, info.Conn.RemoteAddr().String(), | ||||||
|  | 		httpLocalAddr, info.Conn.LocalAddr().String(), | ||||||
|  | 	) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) putIdleConn(err error) { | ||||||
|  | 	ct.end("http.receive", err) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) gotFirstResponseByte() { | ||||||
|  | 	ct.start("http.receive", "http.receive") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) { | ||||||
|  | 	ct.start("http.dns", "http.dns", httpHost, info.Host) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) { | ||||||
|  | 	ct.end("http.dns", info.Err) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) connectStart(network, addr string) { | ||||||
|  | 	_ = network | ||||||
|  | 	ct.start("http.connect."+addr, "http.connect", httpRemoteAddr, addr) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) connectDone(network, addr string, err error) { | ||||||
|  | 	_ = network | ||||||
|  | 	ct.end("http.connect."+addr, err) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) tlsHandshakeStart() { | ||||||
|  | 	ct.start("http.tls", "http.tls") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) tlsHandshakeDone(_ tls.ConnectionState, err error) { | ||||||
|  | 	ct.end("http.tls", err) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) wroteHeaderField(k string, v []string) { | ||||||
|  | 	if sp, ok := ct.span("http.headers"); !ok || sp == nil { | ||||||
|  | 		ct.start("http.headers", "http.headers") | ||||||
|  | 	} | ||||||
|  | 	ct.root.AddLabels("http."+strings.ToLower(k), sliceToString(v)) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) wroteHeaders() { | ||||||
|  | 	ct.start("http.send", "http.send") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) { | ||||||
|  | 	if info.Err != nil { | ||||||
|  | 		ct.root.SetStatus(tracer.SpanStatusError, info.Err.Error()) | ||||||
|  | 	} | ||||||
|  | 	ct.end("http.send", info.Err) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) got100Continue() { | ||||||
|  | 	if sp, ok := ct.span("http.receive"); ok && sp != nil { | ||||||
|  | 		sp.AddEvent("GOT 100 - Continue") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) wait100Continue() { | ||||||
|  | 	if sp, ok := ct.span("http.receive"); ok && sp != nil { | ||||||
|  | 		sp.AddEvent("GOT 100 - Wait") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error { | ||||||
|  | 	if sp, ok := ct.span("http.receive"); ok && sp != nil { | ||||||
|  | 		sp.AddEvent("GOT 1xx", | ||||||
|  | 			options.Labels( | ||||||
|  | 				httpStatus, code, | ||||||
|  | 				httpHeaderMIME, sm2s(header), | ||||||
|  | 			), | ||||||
|  | 		) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func sliceToString(value []string) string { | ||||||
|  | 	if len(value) == 0 { | ||||||
|  | 		return "undefined" | ||||||
|  | 	} | ||||||
|  | 	return strings.Join(value, ",") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func sm2s(value map[string][]string) string { | ||||||
|  | 	var buf strings.Builder | ||||||
|  | 	for k, v := range value { | ||||||
|  | 		if buf.Len() != 0 { | ||||||
|  | 			buf.WriteString(",") | ||||||
|  | 		} | ||||||
|  | 		buf.WriteString(k) | ||||||
|  | 		buf.WriteString("=") | ||||||
|  | 		buf.WriteString(sliceToString(v)) | ||||||
|  | 	} | ||||||
|  | 	return buf.String() | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user