From 247c7f6fa236e9c6f4e13c26529cdc54fa87d6c6 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 8 Sep 2023 13:58:11 +0300 Subject: [PATCH] tracer: improve --- go.mod | 5 +- go.sum | 13 +- tracer/noop.go | 36 ++++-- tracer/options.go | 17 +-- tracer/tracer.go | 12 +- util/grpc/tracer.go | 256 ++++++++++++++++++++++++++++++++++++++ util/http/clienttracer.go | 254 +++++++++++++++++++++++++++++++++++++ 7 files changed, 562 insertions(+), 31 deletions(-) create mode 100644 util/grpc/tracer.go create mode 100644 util/http/clienttracer.go diff --git a/go.mod b/go.mod index 6c1b2657..331e758b 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,13 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 golang.org/x/sync v0.3.0 - golang.org/x/sys v0.7.0 + golang.org/x/sys v0.11.0 google.golang.org/grpc v1.57.0 google.golang.org/protobuf v1.31.0 ) require ( github.com/golang/protobuf v1.5.3 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect + golang.org/x/net v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect ) diff --git a/go.sum b/go.sum index 10d3f8fc..02a83b09 100644 --- a/go.sum +++ b/go.sum @@ -13,15 +13,16 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/tracer/noop.go b/tracer/noop.go index 77f53191..7fd2cc0a 100644 --- a/tracer/noop.go +++ b/tracer/noop.go @@ -9,19 +9,27 @@ import ( var _ Tracer = (*noopTracer)(nil) type noopTracer struct { - opts Options + opts Options + spans []Span +} + +func (t *noopTracer) Spans() []Span { + return t.spans } func (t *noopTracer) Start(ctx context.Context, name string, opts ...options.Option) (context.Context, Span) { + options := NewSpanOptions(opts...) span := &noopSpan{ name: name, ctx: ctx, tracer: t, - opts: NewSpanOptions(opts...), + labels: options.Labels, + kind: options.Kind, } if span.ctx == nil { span.ctx = context.Background() } + t.spans = append(t.spans, span) return NewSpanContext(ctx, span), span } @@ -40,13 +48,21 @@ func (t *noopTracer) Name() string { return t.opts.Name } +type noopEvent struct { + name string + labels []interface{} +} + type noopSpan struct { ctx context.Context tracer Tracer name string - opts SpanOptions - status SpanStatus statusMsg string + events []*noopEvent + labels []interface{} + logs []interface{} + kind SpanKind + status SpanStatus } func (s *noopSpan) Finish(opts ...options.Option) { @@ -61,22 +77,24 @@ func (s *noopSpan) Tracer() Tracer { } func (s *noopSpan) AddEvent(name string, opts ...options.Option) { + options := NewEventOptions(opts...) + s.events = append(s.events, &noopEvent{name: name, labels: options.Labels}) } func (s *noopSpan) SetName(name string) { s.name = name } -func (s *noopSpan) SetLabels(labels ...interface{}) { - s.opts.Labels = labels +func (s *noopSpan) AddLogs(kv ...interface{}) { + s.logs = append(s.logs, kv...) } -func (s *noopSpan) AddLabels(labels ...interface{}) { - s.opts.Labels = append(s.opts.Labels, labels...) +func (s *noopSpan) AddLabels(kv ...interface{}) { + s.labels = append(s.labels, kv...) } func (s *noopSpan) Kind() SpanKind { - return s.opts.Kind + return s.kind } func (s *noopSpan) Status() (SpanStatus, string) { diff --git a/tracer/options.go b/tracer/options.go index 06f2600d..f5a670d2 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -93,14 +93,6 @@ type EventOptions struct { Labels []interface{} } -func WithEventLabels(labels ...interface{}) options.Option { - return options.Labels(labels...) -} - -func WithSpanLabels(labels ...interface{}) options.Option { - return options.Labels(labels...) -} - func WithSpanKind(k SpanKind) options.Option { return func(src interface{}) error { return options.Set(src, k, ".Kind") @@ -128,6 +120,15 @@ func NewSpanOptions(opts ...options.Option) SpanOptions { return options } +// NewEventOptions returns default EventOptions +func NewEventOptions(opts ...options.Option) EventOptions { + options := EventOptions{} + for _, o := range opts { + o(&options) + } + return options +} + // NewOptions returns default options func NewOptions(opts ...options.Option) Options { options := Options{ diff --git a/tracer/tracer.go b/tracer/tracer.go index b76401e0..387cdee5 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -29,8 +29,6 @@ type Span interface { Tracer() Tracer // Finish complete and send span Finish(opts ...options.Option) - // AddEvent add event to span - AddEvent(name string, opts ...options.Option) // Context return context with span Context() context.Context // SetName set the span name @@ -39,10 +37,12 @@ type Span interface { SetStatus(status SpanStatus, msg string) // Status returns span status and msg Status() (SpanStatus, string) - // SetLabels set the span labels - SetLabels(labels ...interface{}) - // AddLabels append the span labels - AddLabels(labels ...interface{}) + // AddLabels append labels to span + AddLabels(kv ...interface{}) + // AddEvent append event to span + AddEvent(name string, opts ...options.Option) + // AddLogs append logs to span + AddLogs(kv ...interface{}) // Kind returns span kind Kind() SpanKind } diff --git a/util/grpc/tracer.go b/util/grpc/tracer.go new file mode 100644 index 00000000..3a9dd524 --- /dev/null +++ b/util/grpc/tracer.go @@ -0,0 +1,256 @@ +package grpc_util + +import ( + "context" + "net" + "strconv" + "strings" + "sync/atomic" + "time" + + "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/tracer" + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +type gRPCContextKey struct{} + +type gRPCContext struct { + messagesReceived int64 + messagesSent int64 +} + +type Options struct { + Tracer tracer.Tracer +} + +// NewServerHandler creates a stats.Handler for gRPC server. +func NewServerHandler(tr tracer.Tracer) stats.Handler { + h := &serverHandler{ + tr: tr, + } + return h +} + +type serverHandler struct { + tr tracer.Tracer +} + +// TagRPC can attach some information to the given context. +func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + name, attrs := parseFullMethod(info.FullMethodName) + attrs = append(attrs, "rpc.system", "grpc") + ctx, _ = h.tr.Start( + ctx, + name, + tracer.WithSpanKind(tracer.SpanKindServer), + options.Labels(attrs...), + ) + + gctx := gRPCContext{} + return context.WithValue(ctx, gRPCContextKey{}, &gctx) +} + +// HandleRPC processes the RPC stats. +func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + if span, ok := tracer.SpanFromContext(ctx); ok { + attrs := peerAttr(peerFromCtx(ctx)) + span.AddLabels(attrs...) + } + return ctx +} + +// HandleConn processes the Conn stats. +func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { +} + +type clientHandler struct { + tr tracer.Tracer +} + +// NewClientHandler creates a stats.Handler for gRPC client. +func NewClientHandler(tr tracer.Tracer) stats.Handler { + h := &clientHandler{ + tr: tr, + } + return h +} + +// TagRPC can attach some information to the given context. +func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + name, attrs := parseFullMethod(info.FullMethodName) + attrs = append(attrs, "rpc.system", "grpc", "rpc.flavor", "grpc", "rpc.call", info.FullMethodName) + ctx, _ = h.tr.Start( + ctx, + name, + tracer.WithSpanKind(tracer.SpanKindClient), + options.Labels(attrs...), + ) + + gctx := gRPCContext{} + + return context.WithValue(ctx, gRPCContextKey{}, &gctx) +} + +// HandleRPC processes the RPC stats. +func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { + // TODO + if span, ok := tracer.SpanFromContext(ctx); ok { + attrs := peerAttr(cti.RemoteAddr.String()) + span.AddLabels(attrs...) + } + return ctx +} + +// HandleConn processes the Conn stats. +func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { + // no-op +} + +func handleRPC(ctx context.Context, rs stats.RPCStats) { + span, ok := tracer.SpanFromContext(ctx) + gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) + var messageID int64 + if rs.IsClient() { + span.AddLabels("span.kind", "client") + } else { + span.AddLabels("span.kind", "server") + } + + switch rs := rs.(type) { + case *stats.Begin: + if rs.IsClientStream || rs.IsServerStream { + span.AddLabels("rpc.call_type", "stream") + } else { + span.AddLabels("rpc.call_type", "unary") + } + span.AddEvent("message", + options.Labels( + "message.begin_time", rs.BeginTime.Format(time.RFC3339), + ), + ) + case *stats.InPayload: + if gctx != nil { + messageID = atomic.AddInt64(&gctx.messagesReceived, 1) + } + if ok { + span.AddEvent("message", + options.Labels( + "message.recv_time", rs.RecvTime.Format(time.RFC3339), + "message.type", "RECEIVED", + "message.id", messageID, + "message.compressed_size", rs.CompressedLength, + "message.uncompressed_size", rs.Length, + ), + ) + } + case *stats.OutPayload: + if gctx != nil { + messageID = atomic.AddInt64(&gctx.messagesSent, 1) + } + if ok { + span.AddEvent("message", + options.Labels( + "message.sent_time", rs.SentTime.Format(time.RFC3339), + "message.type", "SENT", + "message.id", messageID, + "message.compressed_size", rs.CompressedLength, + "message.uncompressed_size", rs.Length, + ), + ) + } + case *stats.End: + if ok { + span.AddEvent("message", + options.Labels( + "message.begin_time", rs.BeginTime.Format(time.RFC3339), + "message.end_time", rs.EndTime.Format(time.RFC3339), + ), + ) + if rs.Error != nil { + s, _ := status.FromError(rs.Error) + span.SetStatus(tracer.SpanStatusError, s.Message()) + span.AddLabels("rpc.grpc.status_code", s.Code()) + } else { + span.AddLabels("rpc.grpc.status_code", grpc_codes.OK) + } + span.Finish() + } + default: + return + } +} + +func parseFullMethod(fullMethod string) (string, []interface{}) { + if !strings.HasPrefix(fullMethod, "/") { + // Invalid format, does not follow `/package.service/method`. + return fullMethod, nil + } + name := fullMethod[1:] + pos := strings.LastIndex(name, "/") + if pos < 0 { + // Invalid format, does not follow `/package.service/method`. + return name, nil + } + service, method := name[:pos], name[pos+1:] + + var attrs []interface{} + if service != "" { + attrs = append(attrs, "rpc.service", service) + } + if method != "" { + attrs = append(attrs, "rpc.method", method) + } + return name, attrs +} + +func peerAttr(addr string) []interface{} { + host, p, err := net.SplitHostPort(addr) + if err != nil { + return nil + } + + if host == "" { + host = "127.0.0.1" + } + port, err := strconv.Atoi(p) + if err != nil { + return nil + } + + var attr []interface{} + if ip := net.ParseIP(host); ip != nil { + attr = []interface{}{ + "net.sock.peer.addr", host, + "net.sock.peer.port", port, + } + } else { + attr = []interface{}{ + "net.peer.name", host, + "net.peer.port", port, + } + } + + return attr +} + +func peerFromCtx(ctx context.Context) string { + p, ok := peer.FromContext(ctx) + if !ok { + return "" + } + return p.Addr.String() +} diff --git a/util/http/clienttracer.go b/util/http/clienttracer.go new file mode 100644 index 00000000..4684453a --- /dev/null +++ b/util/http/clienttracer.go @@ -0,0 +1,254 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "context" + "crypto/tls" + "net/http/httptrace" + "net/textproto" + "strings" + "sync" + + "go.unistack.org/micro/v4/options" + "go.unistack.org/micro/v4/tracer" +) + +const ( + httpStatus = "http.status" + httpHeaderMIME = "http.mime" + httpRemoteAddr = "http.remote" + httpLocalAddr = "http.local" + httpHost = "http.host" +) + +var hookMap = map[string]string{ + "http.dns": "http.getconn", + "http.connect": "http.getconn", + "http.tls": "http.getconn", +} + +func parentHook(hook string) string { + if strings.HasPrefix(hook, "http.connect") { + return hookMap["http.connect"] + } + return hookMap[hook] +} + +type clientTracer struct { + context.Context + tr tracer.Tracer + activeHooks map[string]context.Context + root tracer.Span + mtx sync.Mutex +} + +func NewClientTrace(ctx context.Context, tr tracer.Tracer) *httptrace.ClientTrace { + ct := &clientTracer{ + Context: ctx, + activeHooks: make(map[string]context.Context), + tr: tr, + } + + return &httptrace.ClientTrace{ + GetConn: ct.getConn, + GotConn: ct.gotConn, + PutIdleConn: ct.putIdleConn, + GotFirstResponseByte: ct.gotFirstResponseByte, + Got100Continue: ct.got100Continue, + Got1xxResponse: ct.got1xxResponse, + DNSStart: ct.dnsStart, + DNSDone: ct.dnsDone, + ConnectStart: ct.connectStart, + ConnectDone: ct.connectDone, + TLSHandshakeStart: ct.tlsHandshakeStart, + TLSHandshakeDone: ct.tlsHandshakeDone, + WroteHeaderField: ct.wroteHeaderField, + WroteHeaders: ct.wroteHeaders, + Wait100Continue: ct.wait100Continue, + WroteRequest: ct.wroteRequest, + } +} + +func (ct *clientTracer) start(hook, spanName string, attrs ...interface{}) { + ct.mtx.Lock() + defer ct.mtx.Unlock() + + if hookCtx, found := ct.activeHooks[hook]; !found { + var sp tracer.Span + ct.activeHooks[hook], sp = ct.tr.Start(ct.getParentContext(hook), spanName, options.Labels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient)) + if ct.root == nil { + ct.root = sp + } + } else { + // end was called before start finished, add the start attributes and end the span here + if span, ok := tracer.SpanFromContext(hookCtx); ok { + span.AddLabels(attrs...) + span.Finish() + } + + delete(ct.activeHooks, hook) + } +} + +func (ct *clientTracer) end(hook string, err error, attrs ...interface{}) { + ct.mtx.Lock() + defer ct.mtx.Unlock() + if ctx, ok := ct.activeHooks[hook]; ok { // nolint:nestif + if span, ok := tracer.SpanFromContext(ctx); ok { + if err != nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } + span.AddLabels(attrs...) + span.Finish() + } + delete(ct.activeHooks, hook) + } else { + // start is not finished before end is called. + // Start a span here with the ending attributes that will be finished when start finishes. + // Yes, it's backwards. v0v + ctx, span := ct.tr.Start(ct.getParentContext(hook), hook, options.Labels(attrs...), tracer.WithSpanKind(tracer.SpanKindClient)) + if err != nil { + span.SetStatus(tracer.SpanStatusError, err.Error()) + } + ct.activeHooks[hook] = ctx + } +} + +func (ct *clientTracer) getParentContext(hook string) context.Context { + ctx, ok := ct.activeHooks[parentHook(hook)] + if !ok { + return ct.Context + } + return ctx +} + +func (ct *clientTracer) span(hook string) (tracer.Span, bool) { + ct.mtx.Lock() + defer ct.mtx.Unlock() + if ctx, ok := ct.activeHooks[hook]; ok { + return tracer.SpanFromContext(ctx) + } + return nil, false +} + +func (ct *clientTracer) getConn(host string) { + ct.start("http.getconn", "http.getconn", httpHost, host) +} + +func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) { + ct.end("http.getconn", + nil, + httpRemoteAddr, info.Conn.RemoteAddr().String(), + httpLocalAddr, info.Conn.LocalAddr().String(), + ) +} + +func (ct *clientTracer) putIdleConn(err error) { + ct.end("http.receive", err) +} + +func (ct *clientTracer) gotFirstResponseByte() { + ct.start("http.receive", "http.receive") +} + +func (ct *clientTracer) dnsStart(info httptrace.DNSStartInfo) { + ct.start("http.dns", "http.dns", httpHost, info.Host) +} + +func (ct *clientTracer) dnsDone(info httptrace.DNSDoneInfo) { + ct.end("http.dns", info.Err) +} + +func (ct *clientTracer) connectStart(network, addr string) { + _ = network + ct.start("http.connect."+addr, "http.connect", httpRemoteAddr, addr) +} + +func (ct *clientTracer) connectDone(network, addr string, err error) { + _ = network + ct.end("http.connect."+addr, err) +} + +func (ct *clientTracer) tlsHandshakeStart() { + ct.start("http.tls", "http.tls") +} + +func (ct *clientTracer) tlsHandshakeDone(_ tls.ConnectionState, err error) { + ct.end("http.tls", err) +} + +func (ct *clientTracer) wroteHeaderField(k string, v []string) { + if sp, ok := ct.span("http.headers"); !ok || sp == nil { + ct.start("http.headers", "http.headers") + } + ct.root.AddLabels("http."+strings.ToLower(k), sliceToString(v)) +} + +func (ct *clientTracer) wroteHeaders() { + ct.start("http.send", "http.send") +} + +func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) { + if info.Err != nil { + ct.root.SetStatus(tracer.SpanStatusError, info.Err.Error()) + } + ct.end("http.send", info.Err) +} + +func (ct *clientTracer) got100Continue() { + if sp, ok := ct.span("http.receive"); ok && sp != nil { + sp.AddEvent("GOT 100 - Continue") + } +} + +func (ct *clientTracer) wait100Continue() { + if sp, ok := ct.span("http.receive"); ok && sp != nil { + sp.AddEvent("GOT 100 - Wait") + } +} + +func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error { + if sp, ok := ct.span("http.receive"); ok && sp != nil { + sp.AddEvent("GOT 1xx", + options.Labels( + httpStatus, code, + httpHeaderMIME, sm2s(header), + ), + ) + } + return nil +} + +func sliceToString(value []string) string { + if len(value) == 0 { + return "undefined" + } + return strings.Join(value, ",") +} + +func sm2s(value map[string][]string) string { + var buf strings.Builder + for k, v := range value { + if buf.Len() != 0 { + buf.WriteString(",") + } + buf.WriteString(k) + buf.WriteString("=") + buf.WriteString(sliceToString(v)) + } + return buf.String() +} -- 2.45.2