diff --git a/metadata/metadata.go b/metadata/metadata.go index 5cb23b9b..4718f420 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -19,6 +19,8 @@ var ( HeaderTimeout = "Micro-Timeout" // HeaderAuthorization specifies Authorization header HeaderAuthorization = "Authorization" + // HeaderXRequestID specifies request id + HeaderXRequestID = "X-Request-Id" ) // Metadata is our way of representing request headers internally. diff --git a/tracer/options.go b/tracer/options.go index cf8c88f6..91d57408 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -91,11 +91,19 @@ type SpanOptions struct { type SpanOption func(o *SpanOptions) // EventOptions contains event options -type EventOptions struct{} +type EventOptions struct { + Labels []interface{} +} // EventOption func signature type EventOption func(o *EventOptions) +func WithEventLabels(labels ...interface{}) EventOption { + return func(o *EventOptions) { + o.Labels = labels + } +} + func WithSpanLabels(labels ...interface{}) SpanOption { return func(o *SpanOptions) { o.Labels = labels diff --git a/tracer/wrapper/wrapper.go b/tracer/wrapper/wrapper.go index fba21743..58d2a00e 100644 --- a/tracer/wrapper/wrapper.go +++ b/tracer/wrapper/wrapper.go @@ -11,107 +11,89 @@ import ( "go.unistack.org/micro/v3/tracer" ) +var DefaultHeadersExctract = []string{metadata.HeaderTopic, metadata.HeaderEndpoint, metadata.HeaderService, metadata.HeaderXRequestID} + +func extractLabels(md metadata.Metadata) []string { + labels := make([]string, 0, 5) + for _, k := range DefaultHeadersExctract { + if v, ok := md.Get(k); ok { + labels = append(labels, k, v) + } + } + return labels +} + var ( DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]interface{}, 0, len(md)+1) - for k, v := range md { - labels = append(labels, k, v) - } + labels = append(labels, extractLabels(md)) } if err != nil { - labels = append(labels, "error", err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error()) } - labels = append(labels, "kind", sp.Kind()) - sp.SetLabels(labels...) + sp.AddLabels(labels...) } DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]interface{}, 0, len(md)) - for k, v := range md { - labels = append(labels, k, v) - } + labels = append(labels, extractLabels(md)) } if err != nil { - labels = append(labels, "error", err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error()) } - labels = append(labels, "kind", sp.Kind()) - sp.SetLabels(labels...) + sp.AddLabels(labels...) } DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { sp.SetName(fmt.Sprintf("Publish %s", msg.Topic())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]interface{}, 0, len(md)) - for k, v := range md { - labels = append(labels, k, v) - } + labels = append(labels, extractLabels(md)) } if err != nil { - labels = append(labels, "error", err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error()) } - labels = append(labels, "kind", sp.Kind()) - sp.SetLabels(labels...) + sp.AddLabels(labels...) } DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = make([]interface{}, 0, len(md)) - for k, v := range md { - labels = append(labels, k, v) - } + labels = append(labels, extractLabels(md)) } if err != nil { - labels = append(labels, "error", err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error()) } - labels = append(labels, "kind", sp.Kind()) - sp.SetLabels(labels...) + sp.AddLabels(labels...) } DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { sp.SetName(fmt.Sprintf("Subscriber %s", msg.Topic())) var labels []interface{} if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = make([]interface{}, 0, len(md)) - for k, v := range md { - labels = append(labels, k, v) - } + labels = append(labels, extractLabels(md)) } if err != nil { - labels = append(labels, "error", err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error()) } - labels = append(labels, "kind", sp.Kind()) - sp.SetLabels(labels...) + sp.AddLabels(labels...) } DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) var labels []interface{} if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]interface{}, 0, len(md)) - for k, v := range md { - labels = append(labels, k, v) - } + labels = append(labels, extractLabels(md)) } if err != nil { - labels = append(labels, "error", err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error()) } - labels = append(labels, "kind", sp.Kind()) - sp.SetLabels(labels...) + sp.AddLabels(labels...) } DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"} @@ -243,7 +225,14 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ sp, ok := tracer.SpanFromContext(ctx) if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient)) + ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels( + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "unary", + ), + ) } defer sp.Finish() @@ -266,7 +255,14 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie sp, ok := tracer.SpanFromContext(ctx) if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient)) + ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels( + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "stream", + ), + ) } defer sp.Finish() @@ -303,10 +299,23 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i } } + callType := "unary" + if req.Stream() { + callType = "stream" + } + sp, ok := tracer.SpanFromContext(ctx) if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindServer)) + ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-server", + tracer.WithSpanKind(tracer.SpanKindServer), + tracer.WithSpanLabels( + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", callType, + ), + ) } + defer sp.Finish() err := ot.serverHandler(ctx, req, rsp) @@ -368,7 +377,14 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client. sp, ok := tracer.SpanFromContext(ctx) if !ok { - ctx, sp = ot.opts.Tracer.Start(ctx, "", tracer.WithSpanKind(tracer.SpanKindClient)) + ctx, sp = ot.opts.Tracer.Start(ctx, "rpc-client", + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels( + "rpc.flavor", "rpc", + "rpc.call", "/"+req.Service()+"/"+req.Endpoint(), + "rpc.call_type", "unary", + ), + ) } defer sp.Finish()