From 6189a1b98024b940c679c8397103e08729de50a8 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Thu, 25 Mar 2021 23:30:38 +0300 Subject: [PATCH] add SkipEndpoints for wrappers Signed-off-by: Vasiliy Tolstov --- logger/wrapper/wrapper.go | 55 +++++++++++++++++++++++++++++++++++++++ meter/wrapper/wrapper.go | 24 +++++++++++------ tracer/wrapper/wrapper.go | 40 ++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 8 deletions(-) diff --git a/logger/wrapper/wrapper.go b/logger/wrapper/wrapper.go index c5bb9707..6e818012 100644 --- a/logger/wrapper/wrapper.go +++ b/logger/wrapper/wrapper.go @@ -3,6 +3,7 @@ package wrapper import ( "context" + "fmt" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/logger" @@ -57,6 +58,8 @@ var ( } return labels } + + DefaultSkipEndpoints = []string{"Meter.Metrics"} ) type lWrapper struct { @@ -94,6 +97,8 @@ type Options struct { ServerHandlerObservers []ServerHandlerObserver // ServerSubscriberObservers funcs ServerSubscriberObservers []ServerSubscriberObserver + // SkipEndpoints + SkipEndpoints []string } // Option func signature @@ -110,6 +115,7 @@ func NewOptions(opts ...Option) Options { ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver}, + SkipEndpoints: DefaultSkipEndpoints, } for _, o := range opts { @@ -182,9 +188,23 @@ func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option { } } +// SkipEndpoins +func SkipEndpoints(eps ...string) Option { + return func(o *Options) { + o.SkipEndpoints = append(o.SkipEndpoints, eps...) + } +} + func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { err := l.Client.Call(ctx, req, rsp, opts...) + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + for _, ep := range l.opts.SkipEndpoints { + if ep == endpoint { + return err + } + } + if !l.opts.Enabled { return err } @@ -205,6 +225,13 @@ func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{} func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { stream, err := l.Client.Stream(ctx, req, opts...) + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + for _, ep := range l.opts.SkipEndpoints { + if ep == endpoint { + return stream, err + } + } + if !l.opts.Enabled { return stream, err } @@ -225,6 +252,13 @@ func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...clien func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { err := l.Client.Publish(ctx, msg, opts...) + endpoint := msg.Topic() + for _, ep := range l.opts.SkipEndpoints { + if ep == endpoint { + return err + } + } + if !l.opts.Enabled { return err } @@ -245,6 +279,13 @@ func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...clie func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { err := l.serverHandler(ctx, req, rsp) + endpoint := req.Endpoint() + for _, ep := range l.opts.SkipEndpoints { + if ep == endpoint { + return err + } + } + if !l.opts.Enabled { return err } @@ -265,6 +306,13 @@ func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp in func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { err := l.serverSubscriber(ctx, msg) + endpoint := msg.Topic() + for _, ep := range l.opts.SkipEndpoints { + if ep == endpoint { + return err + } + } + if !l.opts.Enabled { return err } @@ -309,6 +357,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper { func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { err := l.clientCallFunc(ctx, addr, req, rsp, opts) + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + for _, ep := range l.opts.SkipEndpoints { + if ep == endpoint { + return err + } + } + if !l.opts.Enabled { return err } diff --git a/meter/wrapper/wrapper.go b/meter/wrapper/wrapper.go index d30c103b..4dab3784 100644 --- a/meter/wrapper/wrapper.go +++ b/meter/wrapper/wrapper.go @@ -34,16 +34,18 @@ var ( ) type Options struct { - Meter meter.Meter - lopts []meter.Option + Meter meter.Meter + lopts []meter.Option + SkipEndpoints []string } type Option func(*Options) func NewOptions(opts ...Option) Options { options := Options{ - Meter: meter.DefaultMeter, - lopts: make([]meter.Option, 0, 5), + Meter: meter.DefaultMeter, + lopts: make([]meter.Option, 0, 5), + SkipEndpoints: DefaultSkipEndpoints, } for _, o := range opts { o(&options) @@ -75,6 +77,12 @@ func Meter(m meter.Meter) Option { } } +func SkipEndoints(eps ...string) Option { + return func(o *Options) { + o.SkipEndpoints = append(o.SkipEndpoints, eps...) + } +} + type wrapper struct { client.Client callFunc client.CallFunc @@ -103,7 +111,7 @@ func NewCallWrapper(opts ...Option) client.CallWrapper { func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range DefaultSkipEndpoints { + for _, ep := range w.opts.SkipEndpoints { if ep == endpoint { return w.callFunc(ctx, addr, req, rsp, opts) } @@ -130,7 +138,7 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range DefaultSkipEndpoints { + for _, ep := range w.opts.SkipEndpoints { if ep == endpoint { return w.Client.Call(ctx, req, rsp, opts...) } @@ -158,7 +166,7 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range DefaultSkipEndpoints { + for _, ep := range w.opts.SkipEndpoints { if ep == endpoint { return w.Client.Stream(ctx, req, opts...) } @@ -217,7 +225,7 @@ func NewHandlerWrapper(opts ...Option) server.HandlerWrapper { func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { return func(ctx context.Context, req server.Request, rsp interface{}) error { endpoint := req.Endpoint() - for _, ep := range DefaultSkipEndpoints { + for _, ep := range w.opts.SkipEndpoints { if ep == endpoint { return fn(ctx, req, rsp) } diff --git a/tracer/wrapper/wrapper.go b/tracer/wrapper/wrapper.go index 32fa167b..9a005b1b 100644 --- a/tracer/wrapper/wrapper.go +++ b/tracer/wrapper/wrapper.go @@ -101,6 +101,8 @@ var ( } sp.SetLabels(labels...) } + + DefaultSkipEndpoints = []string{"Meter.Metrics"} ) type tWrapper struct { @@ -134,6 +136,8 @@ type Options struct { ServerHandlerObservers []ServerHandlerObserver // ServerSubscriberObservers funcs ServerSubscriberObservers []ServerSubscriberObserver + // SkipEndpoints + SkipEndpoints []string } // Option func signature @@ -149,6 +153,7 @@ func NewOptions(opts ...Option) Options { ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver}, + SkipEndpoints: DefaultSkipEndpoints, } for _, o := range opts { @@ -165,6 +170,13 @@ func WithTracer(t tracer.Tracer) Option { } } +// SkipEndponts +func SkipEndpoins(eps ...string) Option { + return func(o *Options) { + o.SkipEndpoints = append(o.SkipEndpoints, eps...) + } +} + // WithClientCallObservers funcs func WithClientCallObservers(ob ...ClientCallObserver) Option { return func(o *Options) { @@ -208,6 +220,13 @@ func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option { } func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + for _, ep := range ot.opts.SkipEndpoints { + if ep == endpoint { + return ot.Client.Call(ctx, req, rsp, opts...) + } + } + sp := tracer.SpanFromContext(ctx) defer sp.Finish() @@ -221,6 +240,13 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ } func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + for _, ep := range ot.opts.SkipEndpoints { + if ep == endpoint { + return ot.Client.Stream(ctx, req, opts...) + } + } + sp := tracer.SpanFromContext(ctx) defer sp.Finish() @@ -247,6 +273,13 @@ func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...cli } func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { + endpoint := req.Endpoint() + for _, ep := range ot.opts.SkipEndpoints { + if ep == endpoint { + return ot.serverHandler(ctx, req, rsp) + } + } + sp := tracer.SpanFromContext(ctx) defer sp.Finish() @@ -297,6 +330,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper { } func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + for _, ep := range ot.opts.SkipEndpoints { + if ep == endpoint { + return ot.ClientCallFunc(ctx, addr, req, rsp, opts) + } + } + sp := tracer.SpanFromContext(ctx) defer sp.Finish()