meter support
Some checks failed
build / test (push) Failing after 1m37s
codeql / analyze (go) (push) Failing after 1m48s
build / lint (push) Has been cancelled

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-04-06 22:32:12 +03:00
parent 8c42fbb18b
commit d4a2dd918f
8 changed files with 164 additions and 73 deletions

73
grpc.go
View File

@@ -13,6 +13,9 @@ import (
"sync"
"time"
greflection "google.golang.org/grpc/reflection"
reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1"
// nolint: staticcheck
oldproto "github.com/golang/protobuf/proto"
"go.unistack.org/micro/v3/broker"
@@ -20,7 +23,9 @@ import (
"go.unistack.org/micro/v3/errors"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
msync "go.unistack.org/micro/v3/sync"
"golang.org/x/net/netutil"
@@ -72,6 +77,8 @@ func newServer(opts ...server.Option) *Server {
exit: make(chan chan error),
}
g.opts.Meter = g.opts.Meter.Clone(meter.Labels("type", "grpc"))
return g
}
@@ -152,8 +159,15 @@ func (g *Server) configure(opts ...server.Option) error {
}
g.srv = grpc.NewServer(gopts...)
if v, ok := g.opts.Context.Value(reflectionKey{}).(bool); ok {
g.reflection = v
if v, ok := g.opts.Context.Value(reflectionKey{}).(Reflector); ok {
reflectionv1pb.RegisterServerReflectionServer(
g.srv,
greflection.NewServerV1(greflection.ServerOptions{
Services: v,
DescriptorResolver: v,
ExtensionResolver: v,
}),
)
}
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
@@ -191,14 +205,24 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption {
return opts
}
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
fullMethod, ok := grpc.MethodFromServerStream(stream)
if !ok {
return status.Errorf(codes.Internal, "method does not exist in context")
}
ts := time.Now()
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc()
defer func() {
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", fullMethod).Update(te.Seconds())
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Dec()
}()
serviceName, methodName, err := serviceMethod(fullMethod)
if err != nil {
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(codes.InvalidArgument))).Inc()
return status.New(codes.InvalidArgument, err.Error()).Err()
}
@@ -268,7 +292,8 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
// set the timeout if we have it
if len(td) > 0 {
if n, err := strconv.ParseUint(td, 10, 64); err == nil {
var n uint64
if n, err = strconv.ParseUint(td, 10, 64); err == nil {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(n))
defer cancel()
@@ -279,32 +304,11 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
svc := g.rpc.serviceMap[serviceName]
g.rpc.mu.RUnlock()
/*
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
svc = &service{}
svc.typ = reflect.TypeOf(rfl)
svc.rcvr = reflect.ValueOf(rfl)
svc.name = reflect.Indirect(svc.rcvr).Type().Name()
svc.method = make(map[string]*methodType)
typ := reflect.TypeOf(rfl)
if me, ok := typ.MethodByName("ServerReflectionInfo"); ok {
g.rpc.mu.Lock()
ep, err := prepareEndpoint(me)
if ep != nil && err != nil {
svc.method["ServerReflectionInfo"] = ep
} else if err != nil {
return status.New(codes.Unimplemented, err.Error()).Err()
}
g.rpc.mu.Unlock()
}
}
*/
if svc == nil {
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
}
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(codes.Unimplemented))).Inc()
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
}
@@ -313,16 +317,27 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
}
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(codes.Unimplemented))).Inc()
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
}
// process unary
if !mtype.stream {
return g.processRequest(ctx, stream, svc, mtype, ct)
err = g.processRequest(ctx, stream, svc, mtype, ct)
} else {
// process stream
err = g.processStream(ctx, stream, svc, mtype, ct)
}
// process stream
return g.processStream(ctx, stream, svc, mtype, ct)
st := status.Convert(err)
if st == nil || st.Code() == codes.OK {
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc()
}
return err
}
func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {