add SkipEndpoints for wrappers

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-03-25 23:30:38 +03:00
parent eb2a450a7b
commit 6189a1b980
3 changed files with 111 additions and 8 deletions

View File

@ -3,6 +3,7 @@ package wrapper
import ( import (
"context" "context"
"fmt"
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
@ -57,6 +58,8 @@ var (
} }
return labels return labels
} }
DefaultSkipEndpoints = []string{"Meter.Metrics"}
) )
type lWrapper struct { type lWrapper struct {
@ -94,6 +97,8 @@ type Options struct {
ServerHandlerObservers []ServerHandlerObserver ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs // ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
} }
// Option func signature // Option func signature
@ -110,6 +115,7 @@ func NewOptions(opts ...Option) Options {
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver}, ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
} }
for _, o := range opts { for _, o := range opts {
@ -182,9 +188,23 @@ func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
} }
} }
// SkipEndpoins
func SkipEndpoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
err := l.Client.Call(ctx, req, rsp, opts...) err := l.Client.Call(ctx, req, rsp, opts...)
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled { if !l.opts.Enabled {
return err return err
} }
@ -205,6 +225,13 @@ func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}
func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
stream, err := l.Client.Stream(ctx, req, opts...) stream, err := l.Client.Stream(ctx, req, opts...)
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return stream, err
}
}
if !l.opts.Enabled { if !l.opts.Enabled {
return stream, err return stream, err
} }
@ -225,6 +252,13 @@ func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...clien
func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error {
err := l.Client.Publish(ctx, msg, opts...) err := l.Client.Publish(ctx, msg, opts...)
endpoint := msg.Topic()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled { if !l.opts.Enabled {
return err return err
} }
@ -245,6 +279,13 @@ func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...clie
func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
err := l.serverHandler(ctx, req, rsp) err := l.serverHandler(ctx, req, rsp)
endpoint := req.Endpoint()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled { if !l.opts.Enabled {
return err return err
} }
@ -265,6 +306,13 @@ func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp in
func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error {
err := l.serverSubscriber(ctx, msg) err := l.serverSubscriber(ctx, msg)
endpoint := msg.Topic()
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled { if !l.opts.Enabled {
return err return err
} }
@ -309,6 +357,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper {
func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
err := l.clientCallFunc(ctx, addr, req, rsp, opts) err := l.clientCallFunc(ctx, addr, req, rsp, opts)
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range l.opts.SkipEndpoints {
if ep == endpoint {
return err
}
}
if !l.opts.Enabled { if !l.opts.Enabled {
return err return err
} }

View File

@ -34,16 +34,18 @@ var (
) )
type Options struct { type Options struct {
Meter meter.Meter Meter meter.Meter
lopts []meter.Option lopts []meter.Option
SkipEndpoints []string
} }
type Option func(*Options) type Option func(*Options)
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
lopts: make([]meter.Option, 0, 5), lopts: make([]meter.Option, 0, 5),
SkipEndpoints: DefaultSkipEndpoints,
} }
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
@ -75,6 +77,12 @@ func Meter(m meter.Meter) Option {
} }
} }
func SkipEndoints(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
type wrapper struct { type wrapper struct {
client.Client client.Client
callFunc client.CallFunc callFunc client.CallFunc
@ -103,7 +111,7 @@ func NewCallWrapper(opts ...Option) client.CallWrapper {
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range DefaultSkipEndpoints { for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint { if ep == endpoint {
return w.callFunc(ctx, addr, req, rsp, opts) return w.callFunc(ctx, addr, req, rsp, opts)
} }
@ -130,7 +138,7 @@ func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request,
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range DefaultSkipEndpoints { for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint { if ep == endpoint {
return w.Client.Call(ctx, req, rsp, opts...) return w.Client.Call(ctx, req, rsp, opts...)
} }
@ -158,7 +166,7 @@ func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{},
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range DefaultSkipEndpoints { for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint { if ep == endpoint {
return w.Client.Stream(ctx, req, opts...) return w.Client.Stream(ctx, req, opts...)
} }
@ -217,7 +225,7 @@ func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error { return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint() endpoint := req.Endpoint()
for _, ep := range DefaultSkipEndpoints { for _, ep := range w.opts.SkipEndpoints {
if ep == endpoint { if ep == endpoint {
return fn(ctx, req, rsp) return fn(ctx, req, rsp)
} }

View File

@ -101,6 +101,8 @@ var (
} }
sp.SetLabels(labels...) sp.SetLabels(labels...)
} }
DefaultSkipEndpoints = []string{"Meter.Metrics"}
) )
type tWrapper struct { type tWrapper struct {
@ -134,6 +136,8 @@ type Options struct {
ServerHandlerObservers []ServerHandlerObserver ServerHandlerObservers []ServerHandlerObserver
// ServerSubscriberObservers funcs // ServerSubscriberObservers funcs
ServerSubscriberObservers []ServerSubscriberObserver ServerSubscriberObservers []ServerSubscriberObserver
// SkipEndpoints
SkipEndpoints []string
} }
// Option func signature // Option func signature
@ -149,6 +153,7 @@ func NewOptions(opts ...Option) Options {
ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver},
ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver},
ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver}, ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver},
SkipEndpoints: DefaultSkipEndpoints,
} }
for _, o := range opts { for _, o := range opts {
@ -165,6 +170,13 @@ func WithTracer(t tracer.Tracer) Option {
} }
} }
// SkipEndponts
func SkipEndpoins(eps ...string) Option {
return func(o *Options) {
o.SkipEndpoints = append(o.SkipEndpoints, eps...)
}
}
// WithClientCallObservers funcs // WithClientCallObservers funcs
func WithClientCallObservers(ob ...ClientCallObserver) Option { func WithClientCallObservers(ob ...ClientCallObserver) Option {
return func(o *Options) { return func(o *Options) {
@ -208,6 +220,13 @@ func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
} }
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Call(ctx, req, rsp, opts...)
}
}
sp := tracer.SpanFromContext(ctx) sp := tracer.SpanFromContext(ctx)
defer sp.Finish() defer sp.Finish()
@ -221,6 +240,13 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{
} }
func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.Client.Stream(ctx, req, opts...)
}
}
sp := tracer.SpanFromContext(ctx) sp := tracer.SpanFromContext(ctx)
defer sp.Finish() defer sp.Finish()
@ -247,6 +273,13 @@ func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...cli
} }
func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint()
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.serverHandler(ctx, req, rsp)
}
}
sp := tracer.SpanFromContext(ctx) sp := tracer.SpanFromContext(ctx)
defer sp.Finish() defer sp.Finish()
@ -297,6 +330,13 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper {
} }
func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
for _, ep := range ot.opts.SkipEndpoints {
if ep == endpoint {
return ot.ClientCallFunc(ctx, addr, req, rsp, opts)
}
}
sp := tracer.SpanFromContext(ctx) sp := tracer.SpanFromContext(ctx)
defer sp.Finish() defer sp.Finish()