tracer: improve #241
5
go.mod
5
go.mod
@ -7,7 +7,7 @@ require (
|
|||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/imdario/mergo v0.3.15
|
github.com/imdario/mergo v0.3.15
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5
|
||||||
golang.org/x/sync v0.3.0
|
golang.org/x/sync v0.3.0
|
||||||
google.golang.org/grpc v1.57.0
|
google.golang.org/grpc v1.57.0
|
||||||
google.golang.org/protobuf v1.31.0
|
google.golang.org/protobuf v1.31.0
|
||||||
@ -15,5 +15,6 @@ require (
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
|
golang.org/x/net v0.14.0 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect
|
||||||
)
|
)
|
||||||
|
15
go.sum
15
go.sum
@ -11,16 +11,17 @@ github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
|
|||||||
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
|
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35 h1:4mohWoM/UGg1BvFFiqSPRl5uwJY3rVV0HQX0ETqauqQ=
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E=
|
||||||
github.com/silas/dag v0.0.0-20211117232152-9d50aa809f35/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I=
|
||||||
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
|
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
|
||||||
|
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
|
||||||
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||||
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
|
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
|
||||||
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
|
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
|
||||||
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
|
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
|
||||||
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
|
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
|
||||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
|
@ -7,19 +7,27 @@ import (
|
|||||||
var _ Tracer = (*noopTracer)(nil)
|
var _ Tracer = (*noopTracer)(nil)
|
||||||
|
|
||||||
type noopTracer struct {
|
type noopTracer struct {
|
||||||
opts Options
|
opts Options
|
||||||
|
spans []Span
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *noopTracer) Spans() []Span {
|
||||||
|
return t.spans
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
|
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
|
||||||
|
options := NewSpanOptions(opts...)
|
||||||
span := &noopSpan{
|
span := &noopSpan{
|
||||||
name: name,
|
name: name,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
tracer: t,
|
tracer: t,
|
||||||
opts: NewSpanOptions(opts...),
|
labels: options.Labels,
|
||||||
|
kind: options.Kind,
|
||||||
}
|
}
|
||||||
if span.ctx == nil {
|
if span.ctx == nil {
|
||||||
span.ctx = context.Background()
|
span.ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
t.spans = append(t.spans, span)
|
||||||
return NewSpanContext(ctx, span), span
|
return NewSpanContext(ctx, span), span
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,13 +46,21 @@ func (t *noopTracer) Name() string {
|
|||||||
return t.opts.Name
|
return t.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type noopEvent struct {
|
||||||
|
name string
|
||||||
|
labels []interface{}
|
||||||
|
}
|
||||||
|
|
||||||
type noopSpan struct {
|
type noopSpan struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
tracer Tracer
|
tracer Tracer
|
||||||
name string
|
name string
|
||||||
opts SpanOptions
|
|
||||||
status SpanStatus
|
|
||||||
statusMsg string
|
statusMsg string
|
||||||
|
events []*noopEvent
|
||||||
|
labels []interface{}
|
||||||
|
logs []interface{}
|
||||||
|
kind SpanKind
|
||||||
|
status SpanStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *noopSpan) Finish(opts ...SpanOption) {
|
func (s *noopSpan) Finish(opts ...SpanOption) {
|
||||||
@ -59,22 +75,24 @@ func (s *noopSpan) Tracer() Tracer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
|
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
|
||||||
|
options := NewEventOptions(opts...)
|
||||||
|
s.events = append(s.events, &noopEvent{name: name, labels: options.Labels})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *noopSpan) SetName(name string) {
|
func (s *noopSpan) SetName(name string) {
|
||||||
s.name = name
|
s.name = name
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *noopSpan) SetLabels(labels ...interface{}) {
|
func (s *noopSpan) AddLogs(kv ...interface{}) {
|
||||||
s.opts.Labels = labels
|
s.logs = append(s.logs, kv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *noopSpan) AddLabels(labels ...interface{}) {
|
func (s *noopSpan) AddLabels(kv ...interface{}) {
|
||||||
s.opts.Labels = append(s.opts.Labels, labels...)
|
s.labels = append(s.labels, kv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *noopSpan) Kind() SpanKind {
|
func (s *noopSpan) Kind() SpanKind {
|
||||||
return s.opts.Kind
|
return s.kind
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *noopSpan) Status() (SpanStatus, string) {
|
func (s *noopSpan) Status() (SpanStatus, string) {
|
||||||
|
@ -98,15 +98,15 @@ type EventOptions struct {
|
|||||||
// EventOption func signature
|
// EventOption func signature
|
||||||
type EventOption func(o *EventOptions)
|
type EventOption func(o *EventOptions)
|
||||||
|
|
||||||
func WithEventLabels(labels ...interface{}) EventOption {
|
func WithEventLabels(kv ...interface{}) EventOption {
|
||||||
return func(o *EventOptions) {
|
return func(o *EventOptions) {
|
||||||
o.Labels = labels
|
o.Labels = kv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithSpanLabels(labels ...interface{}) SpanOption {
|
func WithSpanLabels(kv ...interface{}) SpanOption {
|
||||||
return func(o *SpanOptions) {
|
return func(o *SpanOptions) {
|
||||||
o.Labels = labels
|
o.Labels = kv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,6 +136,15 @@ func Logger(l logger.Logger) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewEventOptions returns default EventOptions
|
||||||
|
func NewEventOptions(opts ...EventOption) EventOptions {
|
||||||
|
options := EventOptions{}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
// NewSpanOptions returns default SpanOptions
|
// NewSpanOptions returns default SpanOptions
|
||||||
func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
func NewSpanOptions(opts ...SpanOption) SpanOptions {
|
||||||
options := SpanOptions{
|
options := SpanOptions{
|
||||||
|
@ -27,8 +27,6 @@ type Span interface {
|
|||||||
Tracer() Tracer
|
Tracer() Tracer
|
||||||
// Finish complete and send span
|
// Finish complete and send span
|
||||||
Finish(opts ...SpanOption)
|
Finish(opts ...SpanOption)
|
||||||
// AddEvent add event to span
|
|
||||||
AddEvent(name string, opts ...EventOption)
|
|
||||||
// Context return context with span
|
// Context return context with span
|
||||||
Context() context.Context
|
Context() context.Context
|
||||||
// SetName set the span name
|
// SetName set the span name
|
||||||
@ -37,10 +35,12 @@ type Span interface {
|
|||||||
SetStatus(status SpanStatus, msg string)
|
SetStatus(status SpanStatus, msg string)
|
||||||
// Status returns span status and msg
|
// Status returns span status and msg
|
||||||
Status() (SpanStatus, string)
|
Status() (SpanStatus, string)
|
||||||
// SetLabels set the span labels
|
// AddLabels append labels to span
|
||||||
SetLabels(labels ...interface{})
|
AddLabels(kv ...interface{})
|
||||||
// AddLabels append the span labels
|
// AddEvent append event to span
|
||||||
AddLabels(labels ...interface{})
|
AddEvent(name string, opts ...EventOption)
|
||||||
|
// AddEvent append event to span
|
||||||
|
AddLogs(kv ...interface{})
|
||||||
// Kind returns span kind
|
// Kind returns span kind
|
||||||
Kind() SpanKind
|
Kind() SpanKind
|
||||||
}
|
}
|
||||||
|
271
util/grpc/tracer.go
Normal file
271
util/grpc/tracer.go
Normal file
@ -0,0 +1,271 @@
|
|||||||
|
package grpc_util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
|
grpc_codes "google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/peer"
|
||||||
|
"google.golang.org/grpc/stats"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
type gRPCContextKey struct{}
|
||||||
|
|
||||||
|
type gRPCContext struct {
|
||||||
|
messagesReceived int64
|
||||||
|
messagesSent int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
Tracer tracer.Tracer
|
||||||
|
}
|
||||||
|
|
||||||
|
type Option func(*Options)
|
||||||
|
|
||||||
|
func Tracer(tr tracer.Tracer) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Tracer = tr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServerHandler creates a stats.Handler for gRPC server.
|
||||||
|
func NewServerHandler(opts ...Option) stats.Handler {
|
||||||
|
options := Options{Tracer: tracer.DefaultTracer}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
h := &serverHandler{
|
||||||
|
opts: options,
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverHandler struct {
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagRPC can attach some information to the given context.
|
||||||
|
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
|
||||||
|
name, attrs := parseFullMethod(info.FullMethodName)
|
||||||
|
attrs = append(attrs, "rpc.system", "grpc")
|
||||||
|
ctx, _ = h.opts.Tracer.Start(
|
||||||
|
ctx,
|
||||||
|
name,
|
||||||
|
tracer.WithSpanKind(tracer.SpanKindServer),
|
||||||
|
tracer.WithSpanLabels(attrs...),
|
||||||
|
)
|
||||||
|
|
||||||
|
gctx := gRPCContext{}
|
||||||
|
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleRPC processes the RPC stats.
|
||||||
|
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
||||||
|
handleRPC(ctx, rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagConn can attach some information to the given context.
|
||||||
|
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
|
||||||
|
if span, ok := tracer.SpanFromContext(ctx); ok {
|
||||||
|
attrs := peerAttr(peerFromCtx(ctx))
|
||||||
|
span.AddLabels(attrs...)
|
||||||
|
}
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleConn processes the Conn stats.
|
||||||
|
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientHandler struct {
|
||||||
|
opts Options
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientHandler creates a stats.Handler for gRPC client.
|
||||||
|
func NewClientHandler(opts ...Option) stats.Handler {
|
||||||
|
options := Options{Tracer: tracer.DefaultTracer}
|
||||||
|
for _, o := range opts {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
h := &clientHandler{
|
||||||
|
opts: options,
|
||||||
|
}
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagRPC can attach some information to the given context.
|
||||||
|
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
|
||||||
|
name, attrs := parseFullMethod(info.FullMethodName)
|
||||||
|
attrs = append(attrs, "rpc.system", "grpc", "rpc.flavor", "grpc", "rpc.call", info.FullMethodName)
|
||||||
|
ctx, _ = h.opts.Tracer.Start(
|
||||||
|
ctx,
|
||||||
|
name,
|
||||||
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
||||||
|
tracer.WithSpanLabels(attrs...),
|
||||||
|
)
|
||||||
|
|
||||||
|
gctx := gRPCContext{}
|
||||||
|
|
||||||
|
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleRPC processes the RPC stats.
|
||||||
|
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
||||||
|
handleRPC(ctx, rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TagConn can attach some information to the given context.
|
||||||
|
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
|
||||||
|
// TODO
|
||||||
|
if span, ok := tracer.SpanFromContext(ctx); ok {
|
||||||
|
attrs := peerAttr(cti.RemoteAddr.String())
|
||||||
|
span.AddLabels(attrs...)
|
||||||
|
}
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleConn processes the Conn stats.
|
||||||
|
func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleRPC(ctx context.Context, rs stats.RPCStats) {
|
||||||
|
span, ok := tracer.SpanFromContext(ctx)
|
||||||
|
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
|
||||||
|
var messageID int64
|
||||||
|
if rs.IsClient() {
|
||||||
|
span.AddLabels("span.kind", "client")
|
||||||
|
} else {
|
||||||
|
span.AddLabels("span.kind", "server")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch rs := rs.(type) {
|
||||||
|
case *stats.Begin:
|
||||||
|
if rs.IsClientStream || rs.IsServerStream {
|
||||||
|
span.AddLabels("rpc.call_type", "stream")
|
||||||
|
} else {
|
||||||
|
span.AddLabels("rpc.call_type", "unary")
|
||||||
|
}
|
||||||
|
span.AddEvent("message",
|
||||||
|
tracer.WithEventLabels(
|
||||||
|
"message.begin_time", rs.BeginTime.Format(time.RFC3339),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
case *stats.InPayload:
|
||||||
|
if gctx != nil {
|
||||||
|
messageID = atomic.AddInt64(&gctx.messagesReceived, 1)
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
span.AddEvent("message",
|
||||||
|
tracer.WithEventLabels(
|
||||||
|
"message.recv_time", rs.RecvTime.Format(time.RFC3339),
|
||||||
|
"message.type", "RECEIVED",
|
||||||
|
"message.id", messageID,
|
||||||
|
"message.compressed_size", rs.CompressedLength,
|
||||||
|
"message.uncompressed_size", rs.Length,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
case *stats.OutPayload:
|
||||||
|
if gctx != nil {
|
||||||
|
messageID = atomic.AddInt64(&gctx.messagesSent, 1)
|
||||||
|
}
|
||||||
|
if ok {
|
||||||
|
span.AddEvent("message",
|
||||||
|
tracer.WithEventLabels(
|
||||||
|
"message.sent_time", rs.SentTime.Format(time.RFC3339),
|
||||||
|
"message.type", "SENT",
|
||||||
|
"message.id", messageID,
|
||||||
|
"message.compressed_size", rs.CompressedLength,
|
||||||
|
"message.uncompressed_size", rs.Length,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
case *stats.End:
|
||||||
|
if ok {
|
||||||
|
span.AddEvent("message",
|
||||||
|
tracer.WithEventLabels(
|
||||||
|
"message.begin_time", rs.BeginTime.Format(time.RFC3339),
|
||||||
|
"message.end_time", rs.EndTime.Format(time.RFC3339),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
if rs.Error != nil {
|
||||||
|
s, _ := status.FromError(rs.Error)
|
||||||
|
span.SetStatus(tracer.SpanStatusError, s.Message())
|
||||||
|
span.AddLabels("rpc.grpc.status_code", s.Code())
|
||||||
|
} else {
|
||||||
|
span.AddLabels("rpc.grpc.status_code", grpc_codes.OK)
|
||||||
|
}
|
||||||
|
span.Finish()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFullMethod(fullMethod string) (string, []interface{}) {
|
||||||
|
if !strings.HasPrefix(fullMethod, "/") {
|
||||||
|
// Invalid format, does not follow `/package.service/method`.
|
||||||
|
return fullMethod, nil
|
||||||
|
}
|
||||||
|
name := fullMethod[1:]
|
||||||
|
pos := strings.LastIndex(name, "/")
|
||||||
|
if pos < 0 {
|
||||||
|
// Invalid format, does not follow `/package.service/method`.
|
||||||
|
return name, nil
|
||||||
|
}
|
||||||
|
service, method := name[:pos], name[pos+1:]
|
||||||
|
|
||||||
|
var attrs []interface{}
|
||||||
|
if service != "" {
|
||||||
|
attrs = append(attrs, "rpc.service", service)
|
||||||
|
}
|
||||||
|
if method != "" {
|
||||||
|
attrs = append(attrs, "rpc.method", method)
|
||||||
|
}
|
||||||
|
return name, attrs
|
||||||
|
}
|
||||||
|
|
||||||
|
func peerAttr(addr string) []interface{} {
|
||||||
|
host, p, err := net.SplitHostPort(addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if host == "" {
|
||||||
|
host = "127.0.0.1"
|
||||||
|
}
|
||||||
|
port, err := strconv.Atoi(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var attr []interface{}
|
||||||
|
if ip := net.ParseIP(host); ip != nil {
|
||||||
|
attr = []interface{}{
|
||||||
|
"net.sock.peer.addr", host,
|
||||||
|
"net.sock.peer.port", port,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
attr = []interface{}{
|
||||||
|
"net.peer.name", host,
|
||||||
|
"net.peer.port", port,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return attr
|
||||||
|
}
|
||||||
|
|
||||||
|
func peerFromCtx(ctx context.Context) string {
|
||||||
|
p, ok := peer.FromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return p.Addr.String()
|
||||||
|
}
|
253
util/http/clienttracer.go
Normal file
253
util/http/clienttracer.go
Normal file
@ -0,0 +1,253 @@
|
|||||||
|
//
|
||||||
|
// Copyright The OpenTelemetry Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"net/http/httptrace"
|
||||||
|
"net/textproto"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"go.unistack.org/micro/v3/tracer"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
httpStatus = "http.status"
|
||||||
|
httpHeaderMIME = "http.mime"
|
||||||
|
httpRemoteAddr = "http.remote"
|
||||||
|
httpLocalAddr = "http.local"
|
||||||
|
httpHost = "http.host"
|
||||||
|
)
|
||||||
|
|
||||||
|
var hookMap = map[string]string{
|
||||||
|
"http.dns": "http.getconn",
|
||||||
|
"http.connect": "http.getconn",
|
||||||
|
"http.tls": "http.getconn",
|
||||||
|
}
|
||||||
|
|
||||||
|
func parentHook(hook string) string {
|
||||||
|
if strings.HasPrefix(hook, "http.connect") {
|
||||||
|
return hookMap["http.connect"]
|
||||||
|
}
|
||||||
|
return hookMap[hook]
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientTracer struct {
|
||||||
|
context.Context
|
||||||
|
tr tracer.Tracer
|
||||||
|
activeHooks map[string]context.Context
|
||||||
|
root tracer.Span
|
||||||
|
mtx sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClientTrace(ctx context.Context, tr tracer.Tracer) *httptrace.ClientTrace {
|
||||||
|
ct := &clientTracer{
|
||||||
|
Context: ctx,
|
||||||
|
activeHooks: make(map[string]context.Context),
|
||||||
|
tr: tr,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &httptrace.ClientTrace{
|
||||||
|
GetConn: ct.getConn,
|
||||||
|
GotConn: ct.gotConn,
|
||||||
|
PutIdleConn: ct.putIdleConn,
|
||||||
|
GotFirstResponseByte: ct.gotFirstResponseByte,
|
||||||
|
Got100Continue: ct.got100Continue,
|
||||||
|
Got1xxResponse: ct.got1xxResponse,
|
||||||
|
DNSStart: ct.dnsStart,
|
||||||
|
DNSDone: ct.dnsDone,
|
||||||
|
ConnectStart: ct.connectStart,
|
||||||
|
ConnectDone: ct.connectDone,
|
||||||
|
TLSHandshakeStart: ct.tlsHandshakeStart,
|
||||||
|
TLSHandshakeDone: ct.tlsHandshakeDone,
|
||||||
|
WroteHeaderField: ct.wroteHeaderField,
|
||||||
|
WroteHeaders: ct.wroteHeaders,
|
||||||
|
Wait100Continue: ct.wait100Continue,
|
||||||
|
WroteRequest: ct.wroteRequest,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) start(hook, spanName string, attrs ...interface{}) {
|
||||||
|
ct.mtx.Lock()
|
||||||
|
defer ct.mtx.Unlock()
|
||||||
|
|
||||||
|
if hookCtx, found := ct.activeHooks[hook]; !found {
|
||||||
|
var sp tracer.Span
|
||||||
|
ct.activeHooks[hook], sp = ct.tr.Start(ct.getParentContext(hook), spanName, tracer.WithSpanLabels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient))
|
||||||
|
if ct.root == nil {
|
||||||
|
ct.root = sp
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// end was called before start finished, add the start attributes and end the span here
|
||||||
|
if span, ok := tracer.SpanFromContext(hookCtx); ok {
|
||||||
|
span.AddLabels(attrs...)
|
||||||
|
span.Finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(ct.activeHooks, hook)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) end(hook string, err error, attrs ...interface{}) {
|
||||||
|
ct.mtx.Lock()
|
||||||
|
defer ct.mtx.Unlock()
|
||||||
|
if ctx, ok := ct.activeHooks[hook]; ok { // nolint:nestif
|
||||||
|
if span, ok := tracer.SpanFromContext(ctx); ok {
|
||||||
|
if err != nil {
|
||||||
|
span.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
}
|
||||||
|
span.AddLabels(attrs...)
|
||||||
|
span.Finish()
|
||||||
|
}
|
||||||
|
delete(ct.activeHooks, hook)
|
||||||
|
} else {
|
||||||
|
// start is not finished before end is called.
|
||||||
|
// Start a span here with the ending attributes that will be finished when start finishes.
|
||||||
|
// Yes, it's backwards. v0v
|
||||||
|
ctx, span := ct.tr.Start(ct.getParentContext(hook), hook, tracer.WithSpanLabels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient))
|
||||||
|
if err != nil {
|
||||||
|
span.SetStatus(tracer.SpanStatusError, err.Error())
|
||||||
|
}
|
||||||
|
ct.activeHooks[hook] = ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) getParentContext(hook string) context.Context {
|
||||||
|
ctx, ok := ct.activeHooks[parentHook(hook)]
|
||||||
|
if !ok {
|
||||||
|
return ct.Context
|
||||||
|
}
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) span(hook string) (tracer.Span, bool) {
|
||||||
|
ct.mtx.Lock()
|
||||||
|
defer ct.mtx.Unlock()
|
||||||
|
if ctx, ok := ct.activeHooks[hook]; ok {
|
||||||
|
return tracer.SpanFromContext(ctx)
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) getConn(host string) {
|
||||||
|
ct.start("http.getconn", "http.getconn", httpHost, host)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) {
|
||||||
|
ct.end("http.getconn",
|
||||||
|
nil,
|
||||||
|
httpRemoteAddr, info.Conn.RemoteAddr().String(),
|
||||||
|
httpLocalAddr, info.Conn.LocalAddr().String(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) putIdleConn(err error) {
|
||||||
|
ct.end("http.receive", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) gotFirstResponseByte() {
|
||||||
|
ct.start("http.receive", "http.receive")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) {
|
||||||
|
ct.start("http.dns", "http.dns", httpHost, info.Host)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) {
|
||||||
|
ct.end("http.dns", info.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) connectStart(network, addr string) {
|
||||||
|
_ = network
|
||||||
|
ct.start("http.connect."+addr, "http.connect", httpRemoteAddr, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) connectDone(network, addr string, err error) {
|
||||||
|
_ = network
|
||||||
|
ct.end("http.connect."+addr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) tlsHandshakeStart() {
|
||||||
|
ct.start("http.tls", "http.tls")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) tlsHandshakeDone(_ tls.ConnectionState, err error) {
|
||||||
|
ct.end("http.tls", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) wroteHeaderField(k string, v []string) {
|
||||||
|
if sp, ok := ct.span("http.headers"); !ok || sp == nil {
|
||||||
|
ct.start("http.headers", "http.headers")
|
||||||
|
}
|
||||||
|
ct.root.AddLabels("http."+strings.ToLower(k), sliceToString(v))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) wroteHeaders() {
|
||||||
|
ct.start("http.send", "http.send")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {
|
||||||
|
if info.Err != nil {
|
||||||
|
ct.root.SetStatus(tracer.SpanStatusError, info.Err.Error())
|
||||||
|
}
|
||||||
|
ct.end("http.send", info.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) got100Continue() {
|
||||||
|
if sp, ok := ct.span("http.receive"); ok && sp != nil {
|
||||||
|
sp.AddEvent("GOT 100 - Continue")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) wait100Continue() {
|
||||||
|
if sp, ok := ct.span("http.receive"); ok && sp != nil {
|
||||||
|
sp.AddEvent("GOT 100 - Wait")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error {
|
||||||
|
if sp, ok := ct.span("http.receive"); ok && sp != nil {
|
||||||
|
sp.AddEvent("GOT 1xx",
|
||||||
|
tracer.WithEventLabels(
|
||||||
|
httpStatus, code,
|
||||||
|
httpHeaderMIME, sm2s(header),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func sliceToString(value []string) string {
|
||||||
|
if len(value) == 0 {
|
||||||
|
return "undefined"
|
||||||
|
}
|
||||||
|
return strings.Join(value, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
func sm2s(value map[string][]string) string {
|
||||||
|
var buf strings.Builder
|
||||||
|
for k, v := range value {
|
||||||
|
if buf.Len() != 0 {
|
||||||
|
buf.WriteString(",")
|
||||||
|
}
|
||||||
|
buf.WriteString(k)
|
||||||
|
buf.WriteString("=")
|
||||||
|
buf.WriteString(sliceToString(v))
|
||||||
|
}
|
||||||
|
return buf.String()
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user