diff --git a/grpc.go b/grpc.go index cbe13a0..53444e8 100644 --- a/grpc.go +++ b/grpc.go @@ -21,7 +21,9 @@ import ( "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/register" + "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/server" + msync "go.unistack.org/micro/v3/sync" "golang.org/x/net/netutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -47,7 +49,7 @@ type Server struct { handlers map[string]server.Handler srv *grpc.Server exit chan chan error - wg *sync.WaitGroup + wg *msync.WaitGroup rsvc *register.Service subscribers map[*subscriber][]broker.Subscriber rpc *rServer @@ -192,12 +194,30 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption { return opts } -func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) { +func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { + var err error + fullMethod, ok := grpc.MethodFromServerStream(stream) if !ok { return status.Errorf(codes.Internal, "method does not exist in context") } + ts := time.Now() + g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc() + defer func() { + te := time.Since(ts) + g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds()) + g.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", fullMethod).Update(te.Seconds()) + g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Dec() + + st := status.Convert(err) + if st == nil || st.Code() == codes.OK { + g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc() + } else { + g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc() + } + }() + serviceName, methodName, err := serviceMethod(fullMethod) if err != nil { return status.New(codes.InvalidArgument, err.Error()).Err()