lint fixes
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
a102e95433
commit
00eaae717b
@ -1,4 +1,4 @@
|
|||||||
// Package wrapper provides wrapper for Tracer
|
// Package wrapper provides wrapper for Logger
|
||||||
package wrapper
|
package wrapper
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -9,6 +9,56 @@ import (
|
|||||||
"github.com/unistack-org/micro/v3/server"
|
"github.com/unistack-org/micro/v3/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
|
||||||
|
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, "error", err.Error())
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
|
||||||
|
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, "error", err.Error())
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
|
||||||
|
labels := []string{"endpoint", msg.Topic()}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, "error", err.Error())
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
|
||||||
|
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, "error", err.Error())
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, err error) []string {
|
||||||
|
labels := []string{"endpoint", msg.Topic()}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, "error", err.Error())
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
|
||||||
|
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, "error", err.Error())
|
||||||
|
}
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
type lWrapper struct {
|
type lWrapper struct {
|
||||||
client.Client
|
client.Client
|
||||||
serverHandler server.HandlerFunc
|
serverHandler server.HandlerFunc
|
||||||
@ -24,20 +74,32 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf
|
|||||||
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
|
type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string
|
||||||
type ServerSubscriberObserver func(context.Context, server.Message, error) []string
|
type ServerSubscriberObserver func(context.Context, server.Message, error) []string
|
||||||
|
|
||||||
|
// Options struct for wrapper
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Logger logger.Logger
|
// Logger that used for log
|
||||||
Level logger.Level
|
Logger logger.Logger
|
||||||
Enabled bool
|
// Level for logger
|
||||||
ClientCallObservers []ClientCallObserver
|
Level logger.Level
|
||||||
ClientStreamObservers []ClientStreamObserver
|
// Enabled flag
|
||||||
ClientPublishObservers []ClientPublishObserver
|
Enabled bool
|
||||||
ClientCallFuncObservers []ClientCallFuncObserver
|
// ClientCallObservers funcs
|
||||||
ServerHandlerObservers []ServerHandlerObserver
|
ClientCallObservers []ClientCallObserver
|
||||||
|
// ClientStreamObservers funcs
|
||||||
|
ClientStreamObservers []ClientStreamObserver
|
||||||
|
// ClientPublishObservers funcs
|
||||||
|
ClientPublishObservers []ClientPublishObserver
|
||||||
|
// ClientCallFuncObservers funcs
|
||||||
|
ClientCallFuncObservers []ClientCallFuncObserver
|
||||||
|
// ServerHandlerObservers funcs
|
||||||
|
ServerHandlerObservers []ServerHandlerObserver
|
||||||
|
// ServerSubscriberObservers funcs
|
||||||
ServerSubscriberObservers []ServerSubscriberObserver
|
ServerSubscriberObservers []ServerSubscriberObserver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Option func signature
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
|
||||||
|
// NewOptions creates Options from Option slice
|
||||||
func NewOptions(opts ...Option) Options {
|
func NewOptions(opts ...Option) Options {
|
||||||
options := Options{
|
options := Options{
|
||||||
Logger: logger.DefaultLogger,
|
Logger: logger.DefaultLogger,
|
||||||
@ -57,108 +119,69 @@ func NewOptions(opts ...Option) Options {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithEnabled enable/diable flag
|
||||||
func WithEnabled(b bool) Option {
|
func WithEnabled(b bool) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Enabled = b
|
o.Enabled = b
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLevel log level
|
||||||
func WithLevel(l logger.Level) Option {
|
func WithLevel(l logger.Level) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Level = l
|
o.Level = l
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLogger logger
|
||||||
func WithLogger(l logger.Logger) Option {
|
func WithLogger(l logger.Logger) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Logger = l
|
o.Logger = l
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientCallObservers funcs
|
||||||
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientCallObservers = ob
|
o.ClientCallObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientStreamObservers funcs
|
||||||
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientStreamObservers = ob
|
o.ClientStreamObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientPublishObservers funcs
|
||||||
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientPublishObservers = ob
|
o.ClientPublishObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientCallFuncObservers funcs
|
||||||
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientCallFuncObservers = ob
|
o.ClientCallFuncObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithServerHandlerObservers funcs
|
||||||
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ServerHandlerObservers = ob
|
o.ServerHandlerObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithServerSubscriberObservers funcs
|
||||||
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ServerSubscriberObservers = ob
|
o.ServerSubscriberObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string {
|
|
||||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, "error", err.Error())
|
|
||||||
}
|
|
||||||
return labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string {
|
|
||||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, "error", err.Error())
|
|
||||||
}
|
|
||||||
return labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string {
|
|
||||||
labels := []string{"endpoint", msg.Topic()}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, "error", err.Error())
|
|
||||||
}
|
|
||||||
return labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, err error) []string {
|
|
||||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, "error", err.Error())
|
|
||||||
}
|
|
||||||
return labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, err error) []string {
|
|
||||||
labels := []string{"endpoint", msg.Topic()}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, "error", err.Error())
|
|
||||||
}
|
|
||||||
return labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string {
|
|
||||||
labels := []string{"service", req.Service(), "endpoint", req.Endpoint()}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, "error", err.Error())
|
|
||||||
}
|
|
||||||
return labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
err := l.Client.Call(ctx, req, rsp, opts...)
|
err := l.Client.Call(ctx, req, rsp, opts...)
|
||||||
|
|
||||||
@ -259,7 +282,7 @@ func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper
|
// NewClientWrapper accepts an open options and returns a Client Wrapper
|
||||||
func NewClientWrapper(opts ...Option) client.Wrapper {
|
func NewClientWrapper(opts ...Option) client.Wrapper {
|
||||||
return func(c client.Client) client.Client {
|
return func(c client.Client) client.Client {
|
||||||
options := NewOptions()
|
options := NewOptions()
|
||||||
@ -270,7 +293,7 @@ func NewClientWrapper(opts ...Option) client.Wrapper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper
|
// NewClientCallWrapper accepts an options and returns a Call Wrapper
|
||||||
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
|
func NewClientCallWrapper(opts ...Option) client.CallWrapper {
|
||||||
return func(h client.CallFunc) client.CallFunc {
|
return func(h client.CallFunc) client.CallFunc {
|
||||||
options := NewOptions()
|
options := NewOptions()
|
||||||
@ -316,7 +339,7 @@ func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper
|
// NewServerSubscriberWrapper accepts an options and returns a Subscriber Wrapper
|
||||||
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
|
||||||
return func(h server.SubscriberFunc) server.SubscriberFunc {
|
return func(h server.SubscriberFunc) server.SubscriberFunc {
|
||||||
options := NewOptions()
|
options := NewOptions()
|
||||||
|
@ -112,6 +112,7 @@ func Merge(omd Metadata, mmd Metadata, overwrite bool) Metadata {
|
|||||||
return nmd
|
return nmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pairs from which metadata created
|
||||||
func Pairs(kv ...string) (Metadata, bool) {
|
func Pairs(kv ...string) (Metadata, bool) {
|
||||||
if len(kv)%2 == 1 {
|
if len(kv)%2 == 1 {
|
||||||
return nil, false
|
return nil, false
|
||||||
|
@ -11,6 +11,98 @@ import (
|
|||||||
"github.com/unistack-org/micro/v3/tracer"
|
"github.com/unistack-org/micro/v3/tracer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
||||||
|
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||||
|
var labels []tracer.Label
|
||||||
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
|
labels = make([]tracer.Label, 0, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
labels = append(labels, tracer.String(k, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, tracer.Bool("error", true))
|
||||||
|
}
|
||||||
|
sp.SetLabels(labels...)
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
||||||
|
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||||
|
var labels []tracer.Label
|
||||||
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
|
labels = make([]tracer.Label, 0, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
labels = append(labels, tracer.String(k, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, tracer.Bool("error", true))
|
||||||
|
}
|
||||||
|
sp.SetLabels(labels...)
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
||||||
|
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
|
||||||
|
var labels []tracer.Label
|
||||||
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
|
labels = make([]tracer.Label, 0, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
labels = append(labels, tracer.String(k, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, tracer.Bool("error", true))
|
||||||
|
}
|
||||||
|
sp.SetLabels(labels...)
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
||||||
|
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||||
|
var labels []tracer.Label
|
||||||
|
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||||
|
labels = make([]tracer.Label, 0, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
labels = append(labels, tracer.String(k, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, tracer.Bool("error", true))
|
||||||
|
}
|
||||||
|
sp.SetLabels(labels...)
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
||||||
|
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
|
||||||
|
var labels []tracer.Label
|
||||||
|
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||||
|
labels = make([]tracer.Label, 0, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
labels = append(labels, tracer.String(k, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, tracer.Bool("error", true))
|
||||||
|
}
|
||||||
|
sp.SetLabels(labels...)
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
||||||
|
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
||||||
|
var labels []tracer.Label
|
||||||
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
|
labels = make([]tracer.Label, 0, len(md))
|
||||||
|
for k, v := range md {
|
||||||
|
labels = append(labels, tracer.String(k, v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
labels = append(labels, tracer.Bool("error", true))
|
||||||
|
}
|
||||||
|
sp.SetLabels(labels...)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
type tWrapper struct {
|
type tWrapper struct {
|
||||||
client.Client
|
client.Client
|
||||||
serverHandler server.HandlerFunc
|
serverHandler server.HandlerFunc
|
||||||
@ -26,18 +118,28 @@ type ClientCallFuncObserver func(context.Context, string, client.Request, interf
|
|||||||
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
|
type ServerHandlerObserver func(context.Context, server.Request, interface{}, tracer.Span, error)
|
||||||
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
|
type ServerSubscriberObserver func(context.Context, server.Message, tracer.Span, error)
|
||||||
|
|
||||||
|
// Options struct
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Tracer tracer.Tracer
|
// Tracer that used for tracing
|
||||||
ClientCallObservers []ClientCallObserver
|
Tracer tracer.Tracer
|
||||||
ClientStreamObservers []ClientStreamObserver
|
// ClientCallObservers funcs
|
||||||
ClientPublishObservers []ClientPublishObserver
|
ClientCallObservers []ClientCallObserver
|
||||||
ClientCallFuncObservers []ClientCallFuncObserver
|
// ClientStreamObservers funcs
|
||||||
ServerHandlerObservers []ServerHandlerObserver
|
ClientStreamObservers []ClientStreamObserver
|
||||||
|
// ClientPublishObservers funcs
|
||||||
|
ClientPublishObservers []ClientPublishObserver
|
||||||
|
// ClientCallFuncObservers funcs
|
||||||
|
ClientCallFuncObservers []ClientCallFuncObserver
|
||||||
|
// ServerHandlerObservers funcs
|
||||||
|
ServerHandlerObservers []ServerHandlerObserver
|
||||||
|
// ServerSubscriberObservers funcs
|
||||||
ServerSubscriberObservers []ServerSubscriberObserver
|
ServerSubscriberObservers []ServerSubscriberObserver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Option func signature
|
||||||
type Option func(*Options)
|
type Option func(*Options)
|
||||||
|
|
||||||
|
// NewOptions create Options from Option slice
|
||||||
func NewOptions(opts ...Option) Options {
|
func NewOptions(opts ...Option) Options {
|
||||||
options := Options{
|
options := Options{
|
||||||
Tracer: tracer.DefaultTracer,
|
Tracer: tracer.DefaultTracer,
|
||||||
@ -56,138 +158,55 @@ func NewOptions(opts ...Option) Options {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithTracer pass tracer
|
||||||
func WithTracer(t tracer.Tracer) Option {
|
func WithTracer(t tracer.Tracer) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Tracer = t
|
o.Tracer = t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientCallObservers funcs
|
||||||
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
func WithClientCallObservers(ob ...ClientCallObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientCallObservers = ob
|
o.ClientCallObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientStreamObservers funcs
|
||||||
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
func WithClientStreamObservers(ob ...ClientStreamObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientStreamObservers = ob
|
o.ClientStreamObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientPublishObservers funcs
|
||||||
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
func WithClientPublishObservers(ob ...ClientPublishObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientPublishObservers = ob
|
o.ClientPublishObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithClientCallFuncObservers funcs
|
||||||
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ClientCallFuncObservers = ob
|
o.ClientCallFuncObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithServerHandlerObservers funcs
|
||||||
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ServerHandlerObservers = ob
|
o.ServerHandlerObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithServerSubscriberObservers funcs
|
||||||
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.ServerSubscriberObservers = ob
|
o.ServerSubscriberObservers = ob
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) {
|
|
||||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
|
||||||
var labels []tracer.Label
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
||||||
labels = make([]tracer.Label, 0, len(md))
|
|
||||||
for k, v := range md {
|
|
||||||
labels = append(labels, tracer.String(k, v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, tracer.Bool("error", true))
|
|
||||||
}
|
|
||||||
sp.SetLabels(labels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) {
|
|
||||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
|
||||||
var labels []tracer.Label
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
||||||
labels = make([]tracer.Label, 0, len(md))
|
|
||||||
for k, v := range md {
|
|
||||||
labels = append(labels, tracer.String(k, v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, tracer.Bool("error", true))
|
|
||||||
}
|
|
||||||
sp.SetLabels(labels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) {
|
|
||||||
sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic()))
|
|
||||||
var labels []tracer.Label
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
||||||
labels = make([]tracer.Label, 0, len(md))
|
|
||||||
for k, v := range md {
|
|
||||||
labels = append(labels, tracer.String(k, v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, tracer.Bool("error", true))
|
|
||||||
}
|
|
||||||
sp.SetLabels(labels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) {
|
|
||||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
|
||||||
var labels []tracer.Label
|
|
||||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
|
||||||
labels = make([]tracer.Label, 0, len(md))
|
|
||||||
for k, v := range md {
|
|
||||||
labels = append(labels, tracer.String(k, v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, tracer.Bool("error", true))
|
|
||||||
}
|
|
||||||
sp.SetLabels(labels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, sp tracer.Span, err error) {
|
|
||||||
sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic()))
|
|
||||||
var labels []tracer.Label
|
|
||||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
|
||||||
labels = make([]tracer.Label, 0, len(md))
|
|
||||||
for k, v := range md {
|
|
||||||
labels = append(labels, tracer.String(k, v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, tracer.Bool("error", true))
|
|
||||||
}
|
|
||||||
sp.SetLabels(labels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) {
|
|
||||||
sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()))
|
|
||||||
var labels []tracer.Label
|
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
||||||
labels = make([]tracer.Label, 0, len(md))
|
|
||||||
for k, v := range md {
|
|
||||||
labels = append(labels, tracer.String(k, v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
labels = append(labels, tracer.Bool("error", true))
|
|
||||||
}
|
|
||||||
sp.SetLabels(labels...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
sp := tracer.SpanFromContext(ctx)
|
sp := tracer.SpanFromContext(ctx)
|
||||||
defer sp.Finish()
|
defer sp.Finish()
|
||||||
|
Loading…
Reference in New Issue
Block a user