Merge pull request #169 from unistack-org/tracer
tracer/wrapper: fix observers
This commit was merged in pull request #169.
	This commit is contained in:
		| @@ -13,10 +13,10 @@ import ( | |||||||
|  |  | ||||||
| var ( | var ( | ||||||
| 	DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { | 	DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) | 		sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = make([]interface{}, 0, len(md)) | 			labels = make([]interface{}, 0, len(md)+1) | ||||||
| 			for k, v := range md { | 			for k, v := range md { | ||||||
| 				labels = append(labels, k, v) | 				labels = append(labels, k, v) | ||||||
| 			} | 			} | ||||||
| @@ -24,11 +24,12 @@ var ( | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			labels = append(labels, "error", true) | 			labels = append(labels, "error", true) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, "type", "client") | ||||||
| 		sp.SetLabels(labels...) | 		sp.SetLabels(labels...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { | 	DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) | 		sp.SetName(fmt.Sprintf("Stream %s.%s", req.Service(), req.Method())) | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = make([]interface{}, 0, len(md)) | 			labels = make([]interface{}, 0, len(md)) | ||||||
| @@ -39,11 +40,12 @@ var ( | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			labels = append(labels, "error", true) | 			labels = append(labels, "error", true) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, "type", "client") | ||||||
| 		sp.SetLabels(labels...) | 		sp.SetLabels(labels...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { | 	DefaultClientPublishObserver = func(ctx context.Context, msg client.Message, opts []client.PublishOption, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Pub to %s", msg.Topic())) | 		sp.SetName(fmt.Sprintf("Publish %s", msg.Topic())) | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = make([]interface{}, 0, len(md)) | 			labels = make([]interface{}, 0, len(md)) | ||||||
| @@ -54,11 +56,12 @@ var ( | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			labels = append(labels, "error", true) | 			labels = append(labels, "error", true) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, "type", "publisher") | ||||||
| 		sp.SetLabels(labels...) | 		sp.SetLabels(labels...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { | 	DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) | 		sp.SetName(fmt.Sprintf("Handler %s.%s", req.Service(), req.Method())) | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromIncomingContext(ctx); ok { | 		if md, ok := metadata.FromIncomingContext(ctx); ok { | ||||||
| 			labels = make([]interface{}, 0, len(md)) | 			labels = make([]interface{}, 0, len(md)) | ||||||
| @@ -69,11 +72,12 @@ var ( | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			labels = append(labels, "error", true) | 			labels = append(labels, "error", true) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, "type", "server") | ||||||
| 		sp.SetLabels(labels...) | 		sp.SetLabels(labels...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { | 	DefaultServerSubscriberObserver = func(ctx context.Context, msg server.Message, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("Sub from %s", msg.Topic())) | 		sp.SetName(fmt.Sprintf("Subscriber %s", msg.Topic())) | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromIncomingContext(ctx); ok { | 		if md, ok := metadata.FromIncomingContext(ctx); ok { | ||||||
| 			labels = make([]interface{}, 0, len(md)) | 			labels = make([]interface{}, 0, len(md)) | ||||||
| @@ -84,11 +88,12 @@ var ( | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			labels = append(labels, "error", true) | 			labels = append(labels, "error", true) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, "type", "subscriber") | ||||||
| 		sp.SetLabels(labels...) | 		sp.SetLabels(labels...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { | 	DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, sp tracer.Span, err error) { | ||||||
| 		sp.SetName(fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) | 		sp.SetName(fmt.Sprintf("Call %s.%s", req.Service(), req.Method())) | ||||||
| 		var labels []interface{} | 		var labels []interface{} | ||||||
| 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | 		if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||||||
| 			labels = make([]interface{}, 0, len(md)) | 			labels = make([]interface{}, 0, len(md)) | ||||||
| @@ -99,6 +104,7 @@ var ( | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			labels = append(labels, "error", true) | 			labels = append(labels, "error", true) | ||||||
| 		} | 		} | ||||||
|  | 		labels = append(labels, "type", "client") | ||||||
| 		sp.SetLabels(labels...) | 		sp.SetLabels(labels...) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -231,7 +237,7 @@ func (ot *tWrapper) Call(ctx context.Context, req client.Request, rsp interface{ | |||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	sp, ok := tracer.SpanFromContext(ctx) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, endpoint) | 		ctx, sp = ot.opts.Tracer.Start(ctx, "") | ||||||
| 	} | 	} | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| @@ -254,7 +260,7 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie | |||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	sp, ok := tracer.SpanFromContext(ctx) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, endpoint) | 		ctx, sp = ot.opts.Tracer.Start(ctx, "") | ||||||
| 	} | 	} | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| @@ -270,7 +276,7 @@ func (ot *tWrapper) Stream(ctx context.Context, req client.Request, opts ...clie | |||||||
| func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { | func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	sp, ok := tracer.SpanFromContext(ctx) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, msg.Topic()) | 		ctx, sp = ot.opts.Tracer.Start(ctx, "") | ||||||
| 	} | 	} | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| @@ -284,7 +290,7 @@ func (ot *tWrapper) Publish(ctx context.Context, msg client.Message, opts ...cli | |||||||
| } | } | ||||||
|  |  | ||||||
| func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { | func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { | ||||||
| 	endpoint := req.Endpoint() | 	endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method()) | ||||||
| 	for _, ep := range ot.opts.SkipEndpoints { | 	for _, ep := range ot.opts.SkipEndpoints { | ||||||
| 		if ep == endpoint { | 		if ep == endpoint { | ||||||
| 			return ot.serverHandler(ctx, req, rsp) | 			return ot.serverHandler(ctx, req, rsp) | ||||||
| @@ -293,7 +299,7 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i | |||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	sp, ok := tracer.SpanFromContext(ctx) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())) | 		ctx, sp = ot.opts.Tracer.Start(ctx, "") | ||||||
| 	} | 	} | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| @@ -309,7 +315,7 @@ func (ot *tWrapper) ServerHandler(ctx context.Context, req server.Request, rsp i | |||||||
| func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { | func (ot *tWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	sp, ok := tracer.SpanFromContext(ctx) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, msg.Topic()) | 		ctx, sp = ot.opts.Tracer.Start(ctx, "") | ||||||
| 	} | 	} | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
| @@ -347,7 +353,7 @@ func NewClientCallWrapper(opts ...Option) client.CallWrapper { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { | func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { | ||||||
| 	endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) | 	endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Method()) | ||||||
| 	for _, ep := range ot.opts.SkipEndpoints { | 	for _, ep := range ot.opts.SkipEndpoints { | ||||||
| 		if ep == endpoint { | 		if ep == endpoint { | ||||||
| 			return ot.ClientCallFunc(ctx, addr, req, rsp, opts) | 			return ot.ClientCallFunc(ctx, addr, req, rsp, opts) | ||||||
| @@ -356,7 +362,7 @@ func (ot *tWrapper) ClientCallFunc(ctx context.Context, addr string, req client. | |||||||
|  |  | ||||||
| 	sp, ok := tracer.SpanFromContext(ctx) | 	sp, ok := tracer.SpanFromContext(ctx) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		ctx, sp = ot.opts.Tracer.Start(ctx, endpoint) | 		ctx, sp = ot.opts.Tracer.Start(ctx, "") | ||||||
| 	} | 	} | ||||||
| 	defer sp.Finish() | 	defer sp.Finish() | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user