Compare commits
	
		
			10 Commits
		
	
	
		
			v4.1.0
			...
			36c52990ee
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 36c52990ee | |||
| 7ad92816ef | |||
| c47d04db7f | |||
| 80a95aae8f | |||
| 5a9b5e23fc | |||
| 11e3ab33b5 | |||
| 1f8a64c7d6 | |||
| 7969cfd8c9 | |||
| 9526345cd6 | |||
| e3fe27105b | 
							
								
								
									
										74
									
								
								grpc.go
									
									
									
									
									
								
							
							
						
						
									
										74
									
								
								grpc.go
									
									
									
									
									
								
							@@ -428,37 +428,6 @@ func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts
 | 
			
		||||
	return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
 | 
			
		||||
	if req == nil {
 | 
			
		||||
		return errors.InternalServerError("go.micro.client", "req is nil")
 | 
			
		||||
	} else if rsp == nil {
 | 
			
		||||
		return errors.InternalServerError("go.micro.client", "rsp is nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
 | 
			
		||||
	var sp tracer.Span
 | 
			
		||||
	ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
 | 
			
		||||
		tracer.WithSpanKind(tracer.SpanKindClient),
 | 
			
		||||
		tracer.WithSpanLabels("endpoint", req.Endpoint()),
 | 
			
		||||
	)
 | 
			
		||||
	err := g.funcCall(ctx, req, rsp, opts...)
 | 
			
		||||
	g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
 | 
			
		||||
	g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
 | 
			
		||||
 | 
			
		||||
	if me := errors.FromError(err); me == nil {
 | 
			
		||||
		sp.Finish()
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
 | 
			
		||||
	} else {
 | 
			
		||||
		sp.SetStatus(tracer.SpanStatusError, err.Error())
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
 | 
			
		||||
	// make a copy of call opts
 | 
			
		||||
	callOpts := g.opts.CallOptions
 | 
			
		||||
@@ -589,7 +558,13 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
 | 
			
		||||
	return gerr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
 | 
			
		||||
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
 | 
			
		||||
	if req == nil {
 | 
			
		||||
		return errors.InternalServerError("go.micro.client", "req is nil")
 | 
			
		||||
	} else if rsp == nil {
 | 
			
		||||
		return errors.InternalServerError("go.micro.client", "rsp is nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
 | 
			
		||||
	var sp tracer.Span
 | 
			
		||||
@@ -597,21 +572,21 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
 | 
			
		||||
		tracer.WithSpanKind(tracer.SpanKindClient),
 | 
			
		||||
		tracer.WithSpanLabels("endpoint", req.Endpoint()),
 | 
			
		||||
	)
 | 
			
		||||
	stream, err := g.funcStream(ctx, req, opts...)
 | 
			
		||||
	err := g.funcCall(ctx, req, rsp, opts...)
 | 
			
		||||
	g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
 | 
			
		||||
	g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
 | 
			
		||||
 | 
			
		||||
	if me := status.Convert(err); me == nil {
 | 
			
		||||
	if me := errors.FromError(err); me == nil {
 | 
			
		||||
		sp.Finish()
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
 | 
			
		||||
	} else {
 | 
			
		||||
		sp.SetStatus(tracer.SpanStatusError, err.Error())
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return stream, err
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
 | 
			
		||||
@@ -742,6 +717,31 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
 | 
			
		||||
	return nil, grr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
 | 
			
		||||
	ts := time.Now()
 | 
			
		||||
	g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
 | 
			
		||||
	var sp tracer.Span
 | 
			
		||||
	ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
 | 
			
		||||
		tracer.WithSpanKind(tracer.SpanKindClient),
 | 
			
		||||
		tracer.WithSpanLabels("endpoint", req.Endpoint()),
 | 
			
		||||
	)
 | 
			
		||||
	stream, err := g.funcStream(ctx, req, opts...)
 | 
			
		||||
	g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
 | 
			
		||||
	te := time.Since(ts)
 | 
			
		||||
	g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
 | 
			
		||||
	g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
 | 
			
		||||
 | 
			
		||||
	if me := status.Convert(err); me == nil {
 | 
			
		||||
		sp.Finish()
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
 | 
			
		||||
	} else {
 | 
			
		||||
		sp.SetStatus(tracer.SpanStatusError, err.Error())
 | 
			
		||||
		g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return stream, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *grpcClient) String() string {
 | 
			
		||||
	return "grpc"
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user