diff --git a/logger/wrapper/wrapper.go b/logger/wrapper/wrapper.go index 86411368..c5bb9707 100644 --- a/logger/wrapper/wrapper.go +++ b/logger/wrapper/wrapper.go @@ -1,4 +1,4 @@ -// Package wrapper provides wrapper for Tracer +// Package wrapper provides wrapper for Logger package wrapper import ( @@ -9,6 +9,56 @@ import ( "github.com/unistack-org/micro/v3/server" ) +var ( + DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels + } + + DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels + } + + DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string { + labels := []string{"endpoint", msg.Topic()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels + } + + DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels + } + + DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string { + labels := []string{"endpoint", msg.Topic()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels + } + + DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels + } +) + type lWrapper struct { client.Client serverHandler server.HandlerFunc @@ -24,20 +74,32 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string type ServerSubscriberObserver func(context.Context, server.Message, error) []string +// Options struct for wrapper type Options struct { - Logger logger.Logger - Level logger.Level - Enabled bool - ClientCallObservers []ClientCallObserver - ClientStreamObservers []ClientStreamObserver - ClientPublishObservers []ClientPublishObserver - ClientCallFuncObservers []ClientCallFuncObserver - ServerHandlerObservers []ServerHandlerObserver + // Logger that used for log + Logger logger.Logger + // Level for logger + Level logger.Level + // Enabled flag + Enabled bool + // ClientCallObservers funcs + ClientCallObservers []ClientCallObserver + // ClientStreamObservers funcs + ClientStreamObservers []ClientStreamObserver + // ClientPublishObservers funcs + ClientPublishObservers []ClientPublishObserver + // ClientCallFuncObservers funcs + ClientCallFuncObservers []ClientCallFuncObserver + // ServerHandlerObservers funcs + ServerHandlerObservers []ServerHandlerObserver + // ServerSubscriberObservers funcs ServerSubscriberObservers []ServerSubscriberObserver } +// Option func signature type Option func(*Options) +// NewOptions creates Options from Option slice func NewOptions(opts ...Option) Options { options := Options{ Logger: logger.DefaultLogger, @@ -57,108 +119,69 @@ func NewOptions(opts ...Option) Options { return options } +// WithEnabled enable/diable flag func WithEnabled(b bool) Option { return func(o *Options) { o.Enabled = b } } +// WithLevel log level func WithLevel(l logger.Level) Option { return func(o *Options) { o.Level = l } } +// WithLogger logger func WithLogger(l logger.Logger) Option { return func(o *Options) { o.Logger = l } } +// WithClientCallObservers funcs func WithClientCallObservers(ob ...ClientCallObserver) Option { return func(o *Options) { o.ClientCallObservers = ob } } +// WithClientStreamObservers funcs func WithClientStreamObservers(ob ...ClientStreamObserver) Option { return func(o *Options) { o.ClientStreamObservers = ob } } +// WithClientPublishObservers funcs func WithClientPublishObservers(ob ...ClientPublishObserver) Option { return func(o *Options) { o.ClientPublishObservers = ob } } +// WithClientCallFuncObservers funcs func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option { return func(o *Options) { o.ClientCallFuncObservers = ob } } +// WithServerHandlerObservers funcs func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option { return func(o *Options) { o.ServerHandlerObservers = ob } } +// WithServerSubscriberObservers funcs func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option { return func(o *Options) { o.ServerSubscriberObservers = ob } } -func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels -} - -func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels -} - -func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string { - labels := []string{"endpoint", msg.Topic()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels -} - -func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels -} - -func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, err error) []string { - labels := []string{"endpoint", msg.Topic()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels -} - -func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels -} - func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { err := l.Client.Call(ctx, req, rsp, opts...) @@ -259,7 +282,7 @@ func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) err return err } -// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper +// NewClientWrapper accepts an open options and returns a Client Wrapper func NewClientWrapper(opts ...Option) client.Wrapper { return func(c client.Client) client.Client { options := NewOptions() @@ -270,7 +293,7 @@ func NewClientWrapper(opts ...Option) client.Wrapper { } } -// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper +// NewClientCallWrapper accepts an options and returns a Call Wrapper func NewClientCallWrapper(opts ...Option) client.CallWrapper { return func(h client.CallFunc) client.CallFunc { options := NewOptions() @@ -316,7 +339,7 @@ func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper { } } -// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper +// NewServerSubscriberWrapper accepts an options and returns a Subscriber Wrapper func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { return func(h server.SubscriberFunc) server.SubscriberFunc { options := NewOptions() diff --git a/metadata/metadata.go b/metadata/metadata.go index 47f94258..d3ee4ec0 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -112,6 +112,7 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata { return nmd } +// Pairs from which metadata created func Pairs(kv ...string) (Metadata, bool) { if len(kv)%2 == 1 { return nil, false diff --git a/tracer/wrapper/wrapper.go b/tracer/wrapper/wrapper.go index e8785761..32fa167b 100644 --- a/tracer/wrapper/wrapper.go +++ b/tracer/wrapper/wrapper.go @@ -11,6 +11,98 @@ import ( "github.com/unistack-org/micro/v3/tracer" ) +var ( + DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) + } + + DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) + } + + DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) + } + + DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromIncomingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) + } + + DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic())) + var labels []tracer.Label + if md, ok := metadata.FromIncomingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) + } + + DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { + sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) + var labels []tracer.Label + if md, ok := metadata.FromOutgoingContext(ctx); ok { + labels = make([]tracer.Label, 0, len(md)) + for k, v := range md { + labels = append(labels, tracer.String(k, v)) + } + } + if err != nil { + labels = append(labels, tracer.Bool("error", true)) + } + sp.SetLabels(labels...) + } +) + type tWrapper struct { client.Client serverHandler server.HandlerFunc @@ -26,18 +118,28 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error) type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error) +// Options struct type Options struct { - Tracer tracer.Tracer - ClientCallObservers []ClientCallObserver - ClientStreamObservers []ClientStreamObserver - ClientPublishObservers []ClientPublishObserver - ClientCallFuncObservers []ClientCallFuncObserver - ServerHandlerObservers []ServerHandlerObserver + // Tracer that used for tracing + Tracer tracer.Tracer + // ClientCallObservers funcs + ClientCallObservers []ClientCallObserver + // ClientStreamObservers funcs + ClientStreamObservers []ClientStreamObserver + // ClientPublishObservers funcs + ClientPublishObservers []ClientPublishObserver + // ClientCallFuncObservers funcs + ClientCallFuncObservers []ClientCallFuncObserver + // ServerHandlerObservers funcs + ServerHandlerObservers []ServerHandlerObserver + // ServerSubscriberObservers funcs ServerSubscriberObservers []ServerSubscriberObserver } +// Option func signature type Option func(*Options) +// NewOptions create Options from Option slice func NewOptions(opts ...Option) Options { options := Options{ Tracer: tracer.DefaultTracer, @@ -56,138 +158,55 @@ func NewOptions(opts ...Option) Options { return options } +// WithTracer pass tracer func WithTracer(t tracer.Tracer) Option { return func(o *Options) { o.Tracer = t } } +// WithClientCallObservers funcs func WithClientCallObservers(ob ...ClientCallObserver) Option { return func(o *Options) { o.ClientCallObservers = ob } } +// WithClientStreamObservers funcs func WithClientStreamObservers(ob ...ClientStreamObserver) Option { return func(o *Options) { o.ClientStreamObservers = ob } } +// WithClientPublishObservers funcs func WithClientPublishObservers(ob ...ClientPublishObserver) Option { return func(o *Options) { o.ClientPublishObservers = ob } } +// WithClientCallFuncObservers funcs func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option { return func(o *Options) { o.ClientCallFuncObservers = ob } } +// WithServerHandlerObservers funcs func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option { return func(o *Options) { o.ServerHandlerObservers = ob } } +// WithServerSubscriberObservers funcs func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option { return func(o *Options) { o.ServerSubscriberObservers = ob } } -func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) - var labels []tracer.Label - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]tracer.Label, 0, len(md)) - for k, v := range md { - labels = append(labels, tracer.String(k, v)) - } - } - if err != nil { - labels = append(labels, tracer.Bool("error", true)) - } - sp.SetLabels(labels...) -} - -func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) - var labels []tracer.Label - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]tracer.Label, 0, len(md)) - for k, v := range md { - labels = append(labels, tracer.String(k, v)) - } - } - if err != nil { - labels = append(labels, tracer.Bool("error", true)) - } - sp.SetLabels(labels...) -} - -func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic())) - var labels []tracer.Label - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]tracer.Label, 0, len(md)) - for k, v := range md { - labels = append(labels, tracer.String(k, v)) - } - } - if err != nil { - labels = append(labels, tracer.Bool("error", true)) - } - sp.SetLabels(labels...) -} - -func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) - var labels []tracer.Label - if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = make([]tracer.Label, 0, len(md)) - for k, v := range md { - labels = append(labels, tracer.String(k, v)) - } - } - if err != nil { - labels = append(labels, tracer.Bool("error", true)) - } - sp.SetLabels(labels...) -} - -func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic())) - var labels []tracer.Label - if md, ok := metadata.FromIncomingContext(ctx); ok { - labels = make([]tracer.Label, 0, len(md)) - for k, v := range md { - labels = append(labels, tracer.String(k, v)) - } - } - if err != nil { - labels = append(labels, tracer.Bool("error", true)) - } - sp.SetLabels(labels...) -} - -func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { - sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) - var labels []tracer.Label - if md, ok := metadata.FromOutgoingContext(ctx); ok { - labels = make([]tracer.Label, 0, len(md)) - for k, v := range md { - labels = append(labels, tracer.String(k, v)) - } - } - if err != nil { - labels = append(labels, tracer.Bool("error", true)) - } - sp.SetLabels(labels...) -} - func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { sp := tracer.SpanFromContext(ctx) defer sp.Finish()