micro/tracer/wrapper/wrapper.go

416 lines
12 KiB
Go

// Package wrapper provides wrapper for Tracer
package wrapper // import "go.unistack.org/micro/v3/tracer/wrapper"
import (
"context"
"fmt"
"strings"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
"go.unistack.org/micro/v3/tracer"
)
var DefaultHeadersExctract = []string{metadata.HeaderXRequestID}
func ExtractDefaultLabels(md metadata.Metadata) []interface{} {
labels := make([]interface{}, 0, len(DefaultHeadersExctract))
for _, k := range DefaultHeadersExctract {
if v, ok := md.Get(k); ok {
labels = append(labels, strings.ToLower(k), v)
}
}
return labels
}
var (
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Metadata())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
var labels []interface{}
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
labels = append(labels, ExtractDefaultLabels(msg.Header())...)
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s call", req.Service(), req.Method()))
var labels []interface{}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = append(labels, ExtractDefaultLabels(md)...)
}
if err != nil {
sp.SetStatus(tracer.SpanStatusError, err.Error())
}
sp.AddLabels(labels...)
}
DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"}
)
type tWrapper struct {
client.Client
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
opts Options
}
type (
ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
)
// Options struct
type Options struct {
// Tracer that used for tracing
Tracer tracer.Tracer
// ClientCallObservers funcs
ClientCallObservers []ClientCallObserver
// ClientStreamObservers funcs
ClientStreamObservers []ClientStreamObserver
// ClientPublishObservers funcs
ClientPublishObservers []ClientPublishObserver
// ClientCallFuncObservers funcs
ClientCallFuncObservers []ClientCallFuncObserver
// ServerHandlerObservers funcs
ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
}
// Option func signature
type Option func(*Options)
// NewOptions create Options from Option slice
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
}
for _, o := range opts {
o(&options)
}
return options
}
// WithTracer pass tracer
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
// SkipEndponts
func SkipEndpoins(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
// WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
// WithClientStreamObservers funcs
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
// WithClientPublishObservers funcs
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
// WithClientCallFuncObservers funcs
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
// WithServerHandlerObservers funcs
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
// WithServerSubscriberObservers funcs
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Call(ctx, req, rsp, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.Client.Call(nctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(nctx, req, rsp, opts, sp, err)
}
return err
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Stream(ctx, req, opts...)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "stream",
),
)
defer sp.Finish()
stream, err := ot.Client.Stream(nctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(nctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" publish", tracer.WithSpanKind(tracer.SpanKindProducer))
defer sp.Finish()
sp.AddLabels("messaging.destination.name", msg.Topic())
sp.AddLabels("messaging.operation", "publish")
err := ot.Client.Publish(nctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(nctx, msg, opts, sp, err)
}
return err
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.serverHandler(ctx, req, rsp)
}
}
callType := "unary"
if req.Stream() {
callType = "stream"
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-server", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", callType,
),
)
defer sp.Finish()
err := ot.serverHandler(nctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(nctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
nctx, sp := ot.opts.Tracer.Start(ctx, msg.Topic()+" process", tracer.WithSpanKind(tracer.SpanKindConsumer))
defer sp.Finish()
sp.AddLabels("messaging.operation", "process")
sp.AddLabels("messaging.source.name", msg.Topic())
err := ot.serverSubscriber(nctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(nctx, msg, sp, err)
}
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &tWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, clientCallFunc: h}
return ot.ClientCallFunc
}
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
}
}
nctx, sp := ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s rpc-client", req.Service(), req.Method()),
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels(
"rpc.service", req.Service(),
"rpc.method", req.Method(),
"rpc.flavor", "rpc",
"rpc.call", "/"+req.Service()+"/"+req.Endpoint(),
"rpc.call_type", "unary",
),
)
defer sp.Finish()
err := ot.clientCallFunc(nctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(nctx, addr, req, rsp, opts, sp, err)
}
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverHandler: h}
return ot.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverSubscriber: h}
return ot.ServerSubscriber
}
}