tracer: finalize tracer implementation

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-03-04 01:12:16 +03:00
parent a862562284
commit 1f0482fbd5
6 changed files with 475 additions and 189 deletions

View File

@ -3,41 +3,46 @@ package tracer
import (
"context"
"github.com/unistack-org/micro/v3/metadata"
)
const (
traceIDKey = "Micro-Trace-Id"
spanIDKey = "Micro-Span-Id"
)
type tracerKey struct{}
// FromContext returns a span from context
func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFound bool) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", "", false
// FromContext returns a tracer from context
func FromContext(ctx context.Context) Tracer {
if ctx == nil {
return DefaultTracer
}
traceID, traceOk := md.Get(traceIDKey)
microID, microOk := md.Get("Micro-Id")
if !traceOk && !microOk {
isFound = false
return
if tracer, ok := ctx.Value(tracerKey{}).(Tracer); ok {
return tracer
}
if !traceOk {
traceID = microID
}
parentSpanID, ok = md.Get(spanIDKey)
return traceID, parentSpanID, ok
return DefaultTracer
}
// NewContext saves the trace and span ids in the context
func NewContext(ctx context.Context, traceID, parentSpanID string) context.Context {
md, ok := metadata.FromContext(ctx)
if !ok {
md = metadata.New(2)
// NewContext saves the tracer in the context
func NewContext(ctx context.Context, tracer Tracer) context.Context {
if ctx == nil {
ctx = context.Background()
}
md.Set(traceIDKey, traceID)
md.Set(spanIDKey, parentSpanID)
return metadata.NewContext(ctx, md)
return context.WithValue(ctx, tracerKey{}, tracer)
}
type spanKey struct{}
// SpanFromContext returns a span from context
func SpanFromContext(ctx context.Context) Span {
if ctx == nil {
return &noopSpan{}
}
if span, ok := ctx.Value(spanKey{}).(Span); ok {
return span
}
return &noopSpan{}
}
// NewSpanContext saves the span in the context
func NewSpanContext(ctx context.Context, span Span) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, spanKey{}, span)
}

View File

@ -1,99 +0,0 @@
package tracer
import (
"context"
"time"
"github.com/google/uuid"
"github.com/unistack-org/micro/v3/util/ring"
)
type tracer struct {
opts Options
// ring buffer of traces
buffer *ring.Buffer
}
func (t *tracer) Read(opts ...ReadOption) ([]*Span, error) {
var options ReadOptions
for _, o := range opts {
o(&options)
}
sp := t.buffer.Get(t.buffer.Size())
spans := make([]*Span, 0, len(sp))
for _, span := range sp {
val := span.Value.(*Span)
// skip if trace id is specified and doesn't match
if len(options.Trace) > 0 && val.Trace != options.Trace {
continue
}
spans = append(spans, val)
}
return spans, nil
}
func (t *tracer) Start(ctx context.Context, name string) (context.Context, *Span) {
span := &Span{
Name: name,
Trace: uuid.New().String(),
Id: uuid.New().String(),
Started: time.Now(),
Metadata: make(map[string]string),
}
// return span if no context
if ctx == nil {
return NewContext(context.Background(), span.Trace, span.Id), span
}
traceID, parentSpanID, ok := FromContext(ctx)
// If the trace can not be found in the header,
// that means this is where the trace is created.
if !ok {
return NewContext(ctx, span.Trace, span.Id), span
}
// set trace id
span.Trace = traceID
// set parent
span.Parent = parentSpanID
// return the span
return NewContext(ctx, span.Trace, span.Id), span
}
func (t *tracer) Finish(s *Span) error {
// set finished time
s.Duration = time.Since(s.Started)
// save the span
t.buffer.Put(s)
return nil
}
func (t *tracer) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *tracer) Lookup(ctx context.Context) (*Span, error) {
return nil, nil
}
func (t *tracer) Name() string {
return t.opts.Name
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &tracer{
opts: NewOptions(opts...),
// the last 256 requests
buffer: ring.New(256),
}
}

69
tracer/noop.go Normal file
View File

@ -0,0 +1,69 @@
package tracer
import (
"context"
)
type noopTracer struct {
opts Options
}
func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) {
span := &noopSpan{
name: name,
ctx: ctx,
tracer: t,
}
if span.ctx == nil {
span.ctx = context.Background()
}
return NewSpanContext(ctx, span), span
}
func (t *noopTracer) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *noopTracer) Name() string {
return t.opts.Name
}
type noopSpan struct {
name string
ctx context.Context
tracer Tracer
}
func (s *noopSpan) Finish(opts ...SpanOption) {
}
func (s *noopSpan) Context() context.Context {
return s.ctx
}
func (s *noopSpan) Tracer() Tracer {
return s.tracer
}
func (s *noopSpan) AddEvent(name string, opts ...EventOption) {
}
func (s *noopSpan) SetName(name string) {
s.name = name
}
func (s *noopSpan) SetLabels(labels ...Label) {
}
// NewTracer returns new memory tracer
func NewTracer(opts ...Option) Tracer {
return &noopTracer{
opts: NewOptions(opts...),
}
}

View File

@ -2,39 +2,27 @@ package tracer
import "github.com/unistack-org/micro/v3/logger"
var (
// DefaultSize of the buffer
DefaultSize = 64
)
type SpanOptions struct {
}
type SpanOption func(o *SpanOptions)
type EventOptions struct {
}
type EventOption func(o *EventOptions)
// Options struct
type Options struct {
// Name of the tracer
Name string
// Logger is the logger for messages
Logger logger.Logger
// Size is the size of ring buffer
Size int
}
// Option func
type Option func(o *Options)
// ReadOptions struct
type ReadOptions struct {
// Trace id
Trace string
}
// ReadOption func
type ReadOption func(o *ReadOptions)
// ReadTrace read the given trace
func ReadTrace(t string) ReadOption {
return func(o *ReadOptions) {
o.Trace = t
}
}
// Logger sets the logger
func Logger(l logger.Logger) Option {
return func(o *Options) {
@ -46,7 +34,6 @@ func Logger(l logger.Logger) Option {
func NewOptions(opts ...Option) Options {
options := Options{
Logger: logger.DefaultLogger,
Size: DefaultSize,
}
for _, o := range opts {
o(&options)

View File

@ -3,9 +3,6 @@ package tracer
import (
"context"
"time"
"github.com/unistack-org/micro/v3/metadata"
)
var (
@ -15,44 +12,54 @@ var (
// Tracer is an interface for distributed tracing
type Tracer interface {
// Name return tracer name
Name() string
// Init tracer with options
Init(...Option) error
// Start a trace
Start(ctx context.Context, name string) (context.Context, *Span)
// Finish the trace
Finish(*Span) error
// Lookup get span from context
Lookup(ctx context.Context) (*Span, error)
// Read the traces
Read(...ReadOption) ([]*Span, error)
Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span)
}
// SpanType describe the nature of the trace span
type SpanType int
const (
// SpanTypeRequestInbound is a span created when serving a request
SpanTypeRequestInbound SpanType = iota
// SpanTypeRequestOutbound is a span created when making a service call
SpanTypeRequestOutbound
)
// Span is used to record an entry
type Span struct {
// Id of the trace
Trace string
// name of the span
Name string
// id of the span
Id string
// parent span id
Parent string
// Start time
Started time.Time
// Duration in nano seconds
Duration time.Duration
// associated data
Metadata metadata.Metadata
// Type
Type SpanType
type Span interface {
// Tracer return underlining tracer
Tracer() Tracer
// Finish complete and send span
Finish(opts ...SpanOption)
// AddEvent add event to span
AddEvent(name string, opts ...EventOption)
// Context return context with span
Context() context.Context
// SetName set the span name
SetName(name string)
// SetLabels set the span labels
SetLabels(labels ...Label)
}
type Label struct {
key string
val interface{}
}
func Any(k string, v interface{}) Label {
return Label{k, v}
}
func String(k string, v string) Label {
return Label{k, v}
}
func Int(k string, v int) Label {
return Label{k, v}
}
func Int64(k string, v int64) Label {
return Label{k, v}
}
func Float64(k string, v float64) Label {
return Label{k, v}
}
func Bool(k string, v bool) Label {
return Label{k, v}
}

317
tracer/wrapper/wrapper.go Normal file
View File

@ -0,0 +1,317 @@
// Package wrapper provides wrapper for Tracer
package wrapper
import (
"context"
"fmt"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/server"
"github.com/unistack-org/micro/v3/tracer"
)
type tWrapper struct {
opts Options
serverHandler server.HandlerFunc
serverSubscriber server.SubscriberFunc
clientCallFunc client.CallFunc
client.Client
}
type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error)
type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error)
type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error)
type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error)
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
type Options struct {
Tracer tracer.Tracer
ClientCallObservers []ClientCallObserver
ClientStreamObservers []ClientStreamObserver
ClientPublishObservers []ClientPublishObserver
ClientCallFuncObservers []ClientCallFuncObserver
ServerHandlerObservers []ServerHandlerObserver
ServerSubscriberObservers []ServerSubscriberObserver
}
type Option func(*Options)
func NewOptions(opts ...Option) Options {
options := Options{
Tracer: tracer.DefaultTracer,
ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver},
ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver},
ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver},
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
}
for _, o := range opts {
o(&options)
}
return options
}
func WithTracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) {
o.ClientCallObservers = ob
}
}
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
return func(o *Options) {
o.ClientStreamObservers = ob
}
}
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
return func(o *Options) {
o.ClientPublishObservers = ob
}
}
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
return func(o *Options) {
o.ClientCallFuncObservers = ob
}
}
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
return func(o *Options) {
o.ServerHandlerObservers = ob
}
}
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
return func(o *Options) {
o.ServerSubscriberObservers = ob
}
}
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
var labels []tracer.Label
if md, ok := metadata.FromIncomingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
var labels []tracer.Label
if md, ok := metadata.FromOutgoingContext(ctx); ok {
labels = make([]tracer.Label, 0, len(md))
for k, v := range md {
labels = append(labels, tracer.String(k, v))
}
}
if err != nil {
labels = append(labels, tracer.Bool("error", true))
}
sp.SetLabels(labels...)
}
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.Client.Call(ctx, req, rsp, opts...)
for _, o := range ot.opts.ClientCallObservers {
o(ctx, req, rsp, opts, sp, err)
}
return err
}
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
stream, err := ot.Client.Stream(ctx, req, opts...)
for _, o := range ot.opts.ClientStreamObservers {
o(ctx, req, opts, stream, sp, err)
}
return stream, err
}
func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.Client.Publish(ctx, msg, opts...)
for _, o := range ot.opts.ClientPublishObservers {
o(ctx, msg, opts, sp, err)
}
return err
}
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.serverHandler(ctx, req, rsp)
for _, o := range ot.opts.ServerHandlerObservers {
o(ctx, req, rsp, sp, err)
}
return err
}
func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.serverSubscriber(ctx, msg)
for _, o := range ot.opts.ServerSubscriberObservers {
o(ctx, msg, sp, err)
}
return err
}
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
options := NewOptions()
for _, o := range opts {
o(&options)
}
return &tWrapper{opts: options, Client: c}
}
}
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
return func(h client.CallFunc) client.CallFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, clientCallFunc: h}
return ot.ClientCallFunc
}
}
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
sp := tracer.SpanFromContext(ctx)
defer sp.Finish()
err := ot.clientCallFunc(ctx, addr, req, rsp, opts)
for _, o := range ot.opts.ClientCallFuncObservers {
o(ctx, addr, req, rsp, opts, sp, err)
}
return err
}
// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper
func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverHandler: h}
return ot.ServerHandler
}
}
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
return func(h server.SubscriberFunc) server.SubscriberFunc {
options := NewOptions()
for _, o := range opts {
o(&options)
}
ot := &tWrapper{opts: options, serverSubscriber: h}
return ot.ServerSubscriber
}
}