Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
3fc05ae291 | |||
0b29668fe5 |
86
grpc.go
86
grpc.go
@@ -51,6 +51,18 @@ type ServerReflection struct {
|
||||
}
|
||||
*/
|
||||
|
||||
type streamWrapper struct {
|
||||
ctx context.Context
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (w *streamWrapper) Context() context.Context {
|
||||
if w.ctx != nil {
|
||||
return w.ctx
|
||||
}
|
||||
return w.ServerStream.Context()
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
handlers map[string]server.Handler
|
||||
srv *grpc.Server
|
||||
@@ -198,32 +210,11 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||
}
|
||||
|
||||
var sp tracer.Span
|
||||
ctx, sp = g.opts.Tracer.Start(ctx, fullMethod+" rpc-server",
|
||||
tracer.WithSpanKind(tracer.SpanKindServer),
|
||||
tracer.WithSpanLabels(
|
||||
"endpoint", fullMethod,
|
||||
),
|
||||
)
|
||||
|
||||
ts := time.Now()
|
||||
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc()
|
||||
|
||||
defer func() {
|
||||
te := time.Since(ts)
|
||||
g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds())
|
||||
g.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", fullMethod).Update(te.Seconds())
|
||||
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Dec()
|
||||
|
||||
st := status.Convert(err)
|
||||
if st == nil || st.Code() == codes.OK {
|
||||
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
|
||||
} else {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc()
|
||||
// get grpc metadata
|
||||
gmd, ok := gmetadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
gmd = gmetadata.MD{}
|
||||
}
|
||||
sp.Finish()
|
||||
}()
|
||||
|
||||
var serviceName, methodName string
|
||||
serviceName, methodName, err = serviceMethod(fullMethod)
|
||||
@@ -232,17 +223,6 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if g.opts.Wait != nil {
|
||||
g.opts.Wait.Add(1)
|
||||
defer g.opts.Wait.Done()
|
||||
}
|
||||
|
||||
// get grpc metadata
|
||||
gmd, ok := gmetadata.FromIncomingContext(ctx)
|
||||
if !ok {
|
||||
gmd = gmetadata.MD{}
|
||||
}
|
||||
|
||||
md := metadata.New(len(gmd))
|
||||
for k, v := range gmd {
|
||||
md.Set(k, strings.Join(v, ", "))
|
||||
@@ -290,6 +270,40 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
// create new context
|
||||
ctx = metadata.NewIncomingContext(ctx, md)
|
||||
|
||||
var sp tracer.Span
|
||||
ctx, sp = g.opts.Tracer.Start(ctx, fullMethod+" rpc-server",
|
||||
tracer.WithSpanKind(tracer.SpanKindServer),
|
||||
tracer.WithSpanLabels(
|
||||
"endpoint", fullMethod,
|
||||
),
|
||||
)
|
||||
|
||||
stream = &streamWrapper{ctx, stream}
|
||||
|
||||
ts := time.Now()
|
||||
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc()
|
||||
|
||||
defer func() {
|
||||
te := time.Since(ts)
|
||||
g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds())
|
||||
g.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", fullMethod).Update(te.Seconds())
|
||||
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Dec()
|
||||
|
||||
st := status.Convert(err)
|
||||
if st == nil || st.Code() == codes.OK {
|
||||
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
|
||||
} else {
|
||||
sp.SetStatus(tracer.SpanStatusError, err.Error())
|
||||
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc()
|
||||
}
|
||||
sp.Finish()
|
||||
}()
|
||||
|
||||
if g.opts.Wait != nil {
|
||||
g.opts.Wait.Add(1)
|
||||
defer g.opts.Wait.Done()
|
||||
}
|
||||
|
||||
// get peer from context
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
md.Set("Remote", p.Addr.String())
|
||||
|
Reference in New Issue
Block a user