diff --git a/client/noop.go b/client/noop.go index b5ae7791..5fb8d571 100644 --- a/client/noop.go +++ b/client/noop.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strconv" "time" "go.unistack.org/micro/v3/broker" @@ -12,6 +13,8 @@ import ( "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/options" "go.unistack.org/micro/v3/selector" + "go.unistack.org/micro/v3/semconv" + "go.unistack.org/micro/v3/tracer" ) // DefaultCodecs will be used to encode/decode data @@ -104,10 +107,13 @@ func (n *noopResponse) Read() ([]byte, error) { return nil, nil } -type noopStream struct{} +type noopStream struct { + err error + ctx context.Context +} func (n *noopStream) Context() context.Context { - return context.Background() + return n.ctx } func (n *noopStream) Request() Request { @@ -135,15 +141,21 @@ func (n *noopStream) RecvMsg(interface{}) error { } func (n *noopStream) Error() error { - return nil + return n.err } func (n *noopStream) Close() error { - return nil + if sp, ok := tracer.SpanFromContext(n.ctx); ok && sp != nil { + if n.err != nil { + sp.SetStatus(tracer.SpanStatusError, n.err.Error()) + } + sp.Finish() + } + return n.err } func (n *noopStream) CloseSend() error { - return nil + return n.err } func (n *noopMessage) Topic() string { @@ -207,7 +219,28 @@ func (n *noopClient) String() string { } func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error { - return n.funcCall(ctx, req, rsp, opts...) + ts := time.Now() + n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() + var sp tracer.Span + ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels("endpoint", req.Endpoint()), + ) + err := n.funcCall(ctx, req, rsp, opts...) + n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec() + te := time.Since(ts) + n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + + if me := errors.FromError(err); me == nil { + sp.Finish() + n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc() + } else { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc() + } + + return err } func (n *noopClient) fnCall(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error { @@ -349,7 +382,28 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp } func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { - return n.funcStream(ctx, req, opts...) + ts := time.Now() + n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() + var sp tracer.Span + ctx, sp = n.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", + tracer.WithSpanKind(tracer.SpanKindClient), + tracer.WithSpanLabels("endpoint", req.Endpoint()), + ) + stream, err := n.funcStream(ctx, req, opts...) + n.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec() + te := time.Since(ts) + n.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + n.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds()) + + if me := errors.FromError(err); me == nil { + sp.Finish() + n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc() + } else { + sp.SetStatus(tracer.SpanStatusError, err.Error()) + n.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc() + } + + return stream, err } func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { @@ -493,7 +547,7 @@ func (n *noopClient) fnStream(ctx context.Context, req Request, opts ...CallOpti } func (n *noopClient) stream(ctx context.Context, addr string, req Request, opts CallOptions) (Stream, error) { - return &noopStream{}, nil + return &noopStream{ctx: ctx}, nil } func (n *noopClient) BatchPublish(ctx context.Context, ps []Message, opts ...PublishOption) error { diff --git a/errors/errors.go b/errors/errors.go index e9f0e177..b59034e6 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -262,6 +262,10 @@ func CodeIn(err interface{}, codes ...int32) bool { // FromError try to convert go error to *Error func FromError(err error) *Error { + if err == nil { + return nil + } + if verr, ok := err.(*Error); ok && verr != nil { return verr }