diff --git a/tracer/context.go b/tracer/context.go index 34d88be9..7f33150d 100644 --- a/tracer/context.go +++ b/tracer/context.go @@ -3,41 +3,46 @@ package tracer import ( "context" - - "github.com/unistack-org/micro/v3/metadata" ) -const ( - traceIDKey = "Micro-Trace-Id" - spanIDKey = "Micro-Span-Id" -) +type tracerKey struct{} -// FromContext returns a span from context -func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFound bool) { - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return "", "", false +// FromContext returns a tracer from context +func FromContext(ctx context.Context) Tracer { + if ctx == nil { + return DefaultTracer } - traceID, traceOk := md.Get(traceIDKey) - microID, microOk := md.Get("Micro-Id") - if !traceOk && !microOk { - isFound = false - return + if tracer, ok := ctx.Value(tracerKey{}).(Tracer); ok { + return tracer } - if !traceOk { - traceID = microID - } - parentSpanID, ok = md.Get(spanIDKey) - return traceID, parentSpanID, ok + return DefaultTracer } -// NewContext saves the trace and span ids in the context -func NewContext(ctx context.Context, traceID, parentSpanID string) context.Context { - md, ok := metadata.FromContext(ctx) - if !ok { - md = metadata.New(2) +// NewContext saves the tracer in the context +func NewContext(ctx context.Context, tracer Tracer) context.Context { + if ctx == nil { + ctx = context.Background() } - md.Set(traceIDKey, traceID) - md.Set(spanIDKey, parentSpanID) - return metadata.NewContext(ctx, md) + return context.WithValue(ctx, tracerKey{}, tracer) +} + +type spanKey struct{} + +// SpanFromContext returns a span from context +func SpanFromContext(ctx context.Context) Span { + if ctx == nil { + return &noopSpan{} + } + if span, ok := ctx.Value(spanKey{}).(Span); ok { + return span + } + return &noopSpan{} +} + +// NewSpanContext saves the span in the context +func NewSpanContext(ctx context.Context, span Span) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, spanKey{}, span) } diff --git a/tracer/memory.go b/tracer/memory.go deleted file mode 100644 index 0988960c..00000000 --- a/tracer/memory.go +++ /dev/null @@ -1,99 +0,0 @@ -package tracer - -import ( - "context" - "time" - - "github.com/google/uuid" - "github.com/unistack-org/micro/v3/util/ring" -) - -type tracer struct { - opts Options - // ring buffer of traces - buffer *ring.Buffer -} - -func (t *tracer) Read(opts ...ReadOption) ([]*Span, error) { - var options ReadOptions - for _, o := range opts { - o(&options) - } - - sp := t.buffer.Get(t.buffer.Size()) - - spans := make([]*Span, 0, len(sp)) - - for _, span := range sp { - val := span.Value.(*Span) - // skip if trace id is specified and doesn't match - if len(options.Trace) > 0 && val.Trace != options.Trace { - continue - } - spans = append(spans, val) - } - - return spans, nil -} - -func (t *tracer) Start(ctx context.Context, name string) (context.Context, *Span) { - span := &Span{ - Name: name, - Trace: uuid.New().String(), - Id: uuid.New().String(), - Started: time.Now(), - Metadata: make(map[string]string), - } - - // return span if no context - if ctx == nil { - return NewContext(context.Background(), span.Trace, span.Id), span - } - traceID, parentSpanID, ok := FromContext(ctx) - // If the trace can not be found in the header, - // that means this is where the trace is created. - if !ok { - return NewContext(ctx, span.Trace, span.Id), span - } - - // set trace id - span.Trace = traceID - // set parent - span.Parent = parentSpanID - - // return the span - return NewContext(ctx, span.Trace, span.Id), span -} - -func (t *tracer) Finish(s *Span) error { - // set finished time - s.Duration = time.Since(s.Started) - // save the span - t.buffer.Put(s) - - return nil -} - -func (t *tracer) Init(opts ...Option) error { - for _, o := range opts { - o(&t.opts) - } - return nil -} - -func (t *tracer) Lookup(ctx context.Context) (*Span, error) { - return nil, nil -} - -func (t *tracer) Name() string { - return t.opts.Name -} - -// NewTracer returns new memory tracer -func NewTracer(opts ...Option) Tracer { - return &tracer{ - opts: NewOptions(opts...), - // the last 256 requests - buffer: ring.New(256), - } -} diff --git a/tracer/noop.go b/tracer/noop.go new file mode 100644 index 00000000..ad62ec65 --- /dev/null +++ b/tracer/noop.go @@ -0,0 +1,69 @@ +package tracer + +import ( + "context" +) + +type noopTracer struct { + opts Options +} + +func (t *noopTracer) Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) { + span := &noopSpan{ + name: name, + ctx: ctx, + tracer: t, + } + if span.ctx == nil { + span.ctx = context.Background() + } + return NewSpanContext(ctx, span), span +} + +func (t *noopTracer) Init(opts ...Option) error { + for _, o := range opts { + o(&t.opts) + } + return nil +} + +func (t *noopTracer) Name() string { + return t.opts.Name +} + +type noopSpan struct { + name string + ctx context.Context + tracer Tracer +} + +func (s *noopSpan) Finish(opts ...SpanOption) { + +} + +func (s *noopSpan) Context() context.Context { + return s.ctx +} + +func (s *noopSpan) Tracer() Tracer { + return s.tracer +} + +func (s *noopSpan) AddEvent(name string, opts ...EventOption) { + +} + +func (s *noopSpan) SetName(name string) { + s.name = name +} + +func (s *noopSpan) SetLabels(labels ...Label) { + +} + +// NewTracer returns new memory tracer +func NewTracer(opts ...Option) Tracer { + return &noopTracer{ + opts: NewOptions(opts...), + } +} diff --git a/tracer/options.go b/tracer/options.go index df7cc80e..52d83ccf 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -2,39 +2,27 @@ package tracer import "github.com/unistack-org/micro/v3/logger" -var ( - // DefaultSize of the buffer - DefaultSize = 64 -) +type SpanOptions struct { +} + +type SpanOption func(o *SpanOptions) + +type EventOptions struct { +} + +type EventOption func(o *EventOptions) // Options struct type Options struct { + // Name of the tracer Name string // Logger is the logger for messages Logger logger.Logger - // Size is the size of ring buffer - Size int } // Option func type Option func(o *Options) -// ReadOptions struct -type ReadOptions struct { - // Trace id - Trace string -} - -// ReadOption func -type ReadOption func(o *ReadOptions) - -// ReadTrace read the given trace -func ReadTrace(t string) ReadOption { - return func(o *ReadOptions) { - o.Trace = t - } -} - // Logger sets the logger func Logger(l logger.Logger) Option { return func(o *Options) { @@ -46,7 +34,6 @@ func Logger(l logger.Logger) Option { func NewOptions(opts ...Option) Options { options := Options{ Logger: logger.DefaultLogger, - Size: DefaultSize, } for _, o := range opts { o(&options) diff --git a/tracer/tracer.go b/tracer/tracer.go index 09c82085..1c37a2eb 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -3,9 +3,6 @@ package tracer import ( "context" - "time" - - "github.com/unistack-org/micro/v3/metadata" ) var ( @@ -15,44 +12,54 @@ var ( // Tracer is an interface for distributed tracing type Tracer interface { + // Name return tracer name Name() string + // Init tracer with options Init(...Option) error // Start a trace - Start(ctx context.Context, name string) (context.Context, *Span) - // Finish the trace - Finish(*Span) error - // Lookup get span from context - Lookup(ctx context.Context) (*Span, error) - // Read the traces - Read(...ReadOption) ([]*Span, error) + Start(ctx context.Context, name string, opts ...SpanOption) (context.Context, Span) } -// SpanType describe the nature of the trace span -type SpanType int - -const ( - // SpanTypeRequestInbound is a span created when serving a request - SpanTypeRequestInbound SpanType = iota - // SpanTypeRequestOutbound is a span created when making a service call - SpanTypeRequestOutbound -) - -// Span is used to record an entry -type Span struct { - // Id of the trace - Trace string - // name of the span - Name string - // id of the span - Id string - // parent span id - Parent string - // Start time - Started time.Time - // Duration in nano seconds - Duration time.Duration - // associated data - Metadata metadata.Metadata - // Type - Type SpanType +type Span interface { + // Tracer return underlining tracer + Tracer() Tracer + // Finish complete and send span + Finish(opts ...SpanOption) + // AddEvent add event to span + AddEvent(name string, opts ...EventOption) + // Context return context with span + Context() context.Context + // SetName set the span name + SetName(name string) + // SetLabels set the span labels + SetLabels(labels ...Label) +} + +type Label struct { + key string + val interface{} +} + +func Any(k string, v interface{}) Label { + return Label{k, v} +} + +func String(k string, v string) Label { + return Label{k, v} +} + +func Int(k string, v int) Label { + return Label{k, v} +} + +func Int64(k string, v int64) Label { + return Label{k, v} +} + +func Float64(k string, v float64) Label { + return Label{k, v} +} + +func Bool(k string, v bool) Label { + return Label{k, v} } diff --git a/tracer/wrapper/wrapper.go b/tracer/wrapper/wrapper.go new file mode 100644 index 00000000..d1906988 --- /dev/null +++ b/tracer/wrapper/wrapper.go @@ -0,0 +1,317 @@ +// Package wrapper provides wrapper for Tracer +package wrapper + +import ( + "context" + "fmt" + + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/server" + "github.com/unistack-org/micro/v3/tracer" +) + +type tWrapper struct { + opts Options + serverHandler server.HandlerFunc + serverSubscriber server.SubscriberFunc + clientCallFunc client.CallFunc + client.Client +} + +type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, tracer.Span, error) +type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, tracer.Span, error) +type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, tracer.Span, error) +type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, tracer.Span, error) +type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error) +type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error) + +type Options struct { + Tracer tracer.Tracer + ClientCallObservers []ClientCallObserver + ClientStreamObservers []ClientStreamObserver + ClientPublishObservers []ClientPublishObserver + ClientCallFuncObservers []ClientCallFuncObserver + ServerHandlerObservers []ServerHandlerObserver + ServerSubscriberObservers []ServerSubscriberObserver +} + +type Option func(*Options) + +func NewOptions(opts ...Option) Options { + options := Options{ + Tracer: tracer.DefaultTracer, + ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver}, + ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver}, + ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver}, + ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, + ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, + ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver}, + } + + for _, o := range opts { + o(&options) + } + + return options +} + +func WithTracer(t tracer.Tracer) Option { + return func(o *Options) { + o.Tracer = t + } +} + +func WithClientCallObservers(ob ...ClientCallObserver) Option { + return func(o *Options) { + o.ClientCallObservers = ob + } +} + +func WithClientStreamObservers(ob ...ClientStreamObserver) Option { + return func(o *Options) { + o.ClientStreamObservers = ob + } +} + +func WithClientPublishObservers(ob ...ClientPublishObserver) Option { + return func(o *Options) { + o.ClientPublishObservers = ob + } +} + +func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option { + return func(o *Options) { + o.ClientCallFuncObservers = ob + } +} + +func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option { + return func(o *Options) { + o.ServerHandlerObservers = ob + } +} + +func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option { + return func(o *Options) { + o.ServerSubscriberObservers = ob + } +} + +func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) +} + +func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) +} + +func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) +} + +func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromIncomingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) +} + +func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic())) + var labels []tracer.Label + if md, ok := metadata.FromIncomingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) +} + +func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) +} + +func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + sp := tracer.SpanFromContext(ctx) + defer sp.Finish() + + err := ot.Client.Call(ctx, req, rsp, opts...) + + for _, o := range ot.opts.ClientCallObservers { + o(ctx, req, rsp, opts, sp, err) + } + + return err +} + +func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + sp := tracer.SpanFromContext(ctx) + defer sp.Finish() + + stream, err := ot.Client.Stream(ctx, req, opts...) + + for _, o := range ot.opts.ClientStreamObservers { + o(ctx, req, opts, stream, sp, err) + } + + return stream, err +} + +func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { + sp := tracer.SpanFromContext(ctx) + defer sp.Finish() + + err := ot.Client.Publish(ctx, msg, opts...) + + for _, o := range ot.opts.ClientPublishObservers { + o(ctx, msg, opts, sp, err) + } + + return err +} + +func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { + sp := tracer.SpanFromContext(ctx) + defer sp.Finish() + + err := ot.serverHandler(ctx, req, rsp) + + for _, o := range ot.opts.ServerHandlerObservers { + o(ctx, req, rsp, sp, err) + } + + return err +} + +func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { + sp := tracer.SpanFromContext(ctx) + defer sp.Finish() + + err := ot.serverSubscriber(ctx, msg) + + for _, o := range ot.opts.ServerSubscriberObservers { + o(ctx, msg, sp, err) + } + + return err +} + +// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper +func NewClientWrapper(opts ...Option) client.Wrapper { + return func(c client.Client) client.Client { + options := NewOptions() + for _, o := range opts { + o(&options) + } + return &tWrapper{opts: options, Client: c} + } +} + +// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper +func NewClientCallWrapper(opts ...Option) client.CallWrapper { + return func(h client.CallFunc) client.CallFunc { + options := NewOptions() + for _, o := range opts { + o(&options) + } + + ot := &tWrapper{opts: options, clientCallFunc: h} + return ot.ClientCallFunc + } +} + +func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { + sp := tracer.SpanFromContext(ctx) + defer sp.Finish() + + err := ot.clientCallFunc(ctx, addr, req, rsp, opts) + + for _, o := range ot.opts.ClientCallFuncObservers { + o(ctx, addr, req, rsp, opts, sp, err) + } + + return err +} + +// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper +func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper { + return func(h server.HandlerFunc) server.HandlerFunc { + options := NewOptions() + for _, o := range opts { + o(&options) + } + + ot := &tWrapper{opts: options, serverHandler: h} + return ot.ServerHandler + } +} + +// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper +func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { + return func(h server.SubscriberFunc) server.SubscriberFunc { + options := NewOptions() + for _, o := range opts { + o(&options) + } + + ot := &tWrapper{opts: options, serverSubscriber: h} + return ot.ServerSubscriber + } +}