From d4a2dd918fcb9301b38f1b736a9bf0193d77054b Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 6 Apr 2024 22:32:12 +0300 Subject: [PATCH] meter support Signed-off-by: Vasiliy Tolstov --- generate.go | 4 +++ go.mod | 10 +++---- go.sum | 10 +++++++ grpc.go | 73 ++++++++++++++++++++++++++++------------------ reflection.go | 10 +++---- reflection_test.go | 62 +++++++++++++++++++++++++++++++++++++++ request.go | 33 +-------------------- subscriber.go | 35 +++++++++++++++++++++- 8 files changed, 164 insertions(+), 73 deletions(-) create mode 100644 generate.go create mode 100644 reflection_test.go diff --git a/generate.go b/generate.go new file mode 100644 index 0000000..5ea1f90 --- /dev/null +++ b/generate.go @@ -0,0 +1,4 @@ +package grpc + +//go:generate go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +//go:generate sh -c "protoc -I./proto -I$(go list -f '{{ .Dir }}' -m go.unistack.org/micro-proto/v3) -I. --go-grpc_out=paths=source_relative:./proto --go_out=paths=source_relative:./proto proto/test.proto" diff --git a/go.mod b/go.mod index 3d12650..d3d0cd7 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,14 @@ go 1.20 require ( github.com/golang/protobuf v1.5.4 - go.unistack.org/micro/v3 v3.10.52 - golang.org/x/net v0.22.0 - google.golang.org/grpc v1.62.1 + go.unistack.org/micro/v3 v3.10.54 + golang.org/x/net v0.24.0 + google.golang.org/grpc v1.63.0 google.golang.org/protobuf v1.33.0 ) require ( - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect ) diff --git a/go.sum b/go.sum index 84ae8d4..d5dea2f 100644 --- a/go.sum +++ b/go.sum @@ -9,14 +9,20 @@ go.unistack.org/micro/v3 v3.10.42 h1:A0nA6WT6wNq5fyQyzliX70Bj5/SGj5kadLSOySX4hro go.unistack.org/micro/v3 v3.10.42/go.mod h1:CSmEf5ddmft94MyKHnUSMM0W5dpmmTVbgImbgQWV5Ak= go.unistack.org/micro/v3 v3.10.52 h1:6LlAvLLlf+3JLCEQEVNQWi7DXCoI1ocuOqqoEPj5S+k= go.unistack.org/micro/v3 v3.10.52/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= +go.unistack.org/micro/v3 v3.10.54 h1:3qbv7jg+wpcYG/nJXzE/GEIsM8i5UdpytL2cNE8i3y0= +go.unistack.org/micro/v3 v3.10.54/go.mod h1:erMgt3Bl7vQQ0e9UpQyR5NlLiZ9pKeEJ9+1tfYFaqUg= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= @@ -26,10 +32,14 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c h1:lfpJ/2rWPa/kJgxyyXM8PrNnfCzcmxJ265mADgwmvLI= google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8= +google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/grpc.go b/grpc.go index b84ccae..41b70d5 100644 --- a/grpc.go +++ b/grpc.go @@ -13,6 +13,9 @@ import ( "sync" "time" + greflection "google.golang.org/grpc/reflection" + reflectionv1pb "google.golang.org/grpc/reflection/grpc_reflection_v1" + // nolint: staticcheck oldproto "github.com/golang/protobuf/proto" "go.unistack.org/micro/v3/broker" @@ -20,7 +23,9 @@ import ( "go.unistack.org/micro/v3/errors" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/register" + "go.unistack.org/micro/v3/semconv" "go.unistack.org/micro/v3/server" msync "go.unistack.org/micro/v3/sync" "golang.org/x/net/netutil" @@ -72,6 +77,8 @@ func newServer(opts ...server.Option) *Server { exit: make(chan chan error), } + g.opts.Meter = g.opts.Meter.Clone(meter.Labels("type", "grpc")) + return g } @@ -152,8 +159,15 @@ func (g *Server) configure(opts ...server.Option) error { } g.srv = grpc.NewServer(gopts...) - if v, ok := g.opts.Context.Value(reflectionKey{}).(bool); ok { - g.reflection = v + if v, ok := g.opts.Context.Value(reflectionKey{}).(Reflector); ok { + reflectionv1pb.RegisterServerReflectionServer( + g.srv, + greflection.NewServerV1(greflection.ServerOptions{ + Services: v, + DescriptorResolver: v, + ExtensionResolver: v, + }), + ) } if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok { @@ -191,14 +205,24 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption { return opts } -func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) { +func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error { fullMethod, ok := grpc.MethodFromServerStream(stream) if !ok { return status.Errorf(codes.Internal, "method does not exist in context") } + ts := time.Now() + g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc() + defer func() { + te := time.Since(ts) + g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds()) + g.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", fullMethod).Update(te.Seconds()) + g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Dec() + }() + serviceName, methodName, err := serviceMethod(fullMethod) if err != nil { + g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(codes.InvalidArgument))).Inc() return status.New(codes.InvalidArgument, err.Error()).Err() } @@ -268,7 +292,8 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) // set the timeout if we have it if len(td) > 0 { - if n, err := strconv.ParseUint(td, 10, 64); err == nil { + var n uint64 + if n, err = strconv.ParseUint(td, 10, 64); err == nil { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Duration(n)) defer cancel() @@ -279,32 +304,11 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) svc := g.rpc.serviceMap[serviceName] g.rpc.mu.RUnlock() - /* - if svc == nil && g.reflection && methodName == "ServerReflectionInfo" { - rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}} - svc = &service{} - svc.typ = reflect.TypeOf(rfl) - svc.rcvr = reflect.ValueOf(rfl) - svc.name = reflect.Indirect(svc.rcvr).Type().Name() - svc.method = make(map[string]*methodType) - typ := reflect.TypeOf(rfl) - if me, ok := typ.MethodByName("ServerReflectionInfo"); ok { - g.rpc.mu.Lock() - ep, err := prepareEndpoint(me) - if ep != nil && err != nil { - svc.method["ServerReflectionInfo"] = ep - } else if err != nil { - return status.New(codes.Unimplemented, err.Error()).Err() - } - g.rpc.mu.Unlock() - } - } - */ - if svc == nil { if g.unknownHandler != nil { return g.unknownHandler(srv, stream) } + g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(codes.Unimplemented))).Inc() return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err() } @@ -313,16 +317,27 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) if g.unknownHandler != nil { return g.unknownHandler(srv, stream) } + g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(codes.Unimplemented))).Inc() return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err() } // process unary if !mtype.stream { - return g.processRequest(ctx, stream, svc, mtype, ct) + err = g.processRequest(ctx, stream, svc, mtype, ct) + } else { + // process stream + err = g.processStream(ctx, stream, svc, mtype, ct) } - // process stream - return g.processStream(ctx, stream, svc, mtype, ct) + st := status.Convert(err) + + if st == nil || st.Code() == codes.OK { + g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc() + } else { + g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc() + } + + return err } func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error { diff --git a/reflection.go b/reflection.go index 1935d12..b29fec0 100644 --- a/reflection.go +++ b/reflection.go @@ -14,10 +14,8 @@ type Reflector interface { const ( // ReflectV1ServiceName is the fully-qualified name of the v1 version of the reflection service. ReflectV1ServiceName = "grpc.reflection.v1.ServerReflection" - // ReflectV1AlphaServiceName is the fully-qualified name of the v1alpha version of the reflection service. - ReflectV1AlphaServiceName = "grpc.reflection.v1alpha.ServerReflection" - - ReflectServiceURLPathV1 = "/" + ReflectV1ServiceName + "/" - ReflectServiceURLPathV1Alpha = "/" + ReflectV1AlphaServiceName + "/" - ReflectMethodName = "ServerReflectionInfo" + // ReflectServiceURLPathV1 is the full path for reflection service endpoint + ReflectServiceURLPathV1 = "/" + ReflectV1ServiceName + "/" + // ReflectMethodName is the reflection service name + ReflectMethodName = "ServerReflectionInfo" ) diff --git a/reflection_test.go b/reflection_test.go new file mode 100644 index 0000000..c769c7b --- /dev/null +++ b/reflection_test.go @@ -0,0 +1,62 @@ +package grpc + +import ( + "fmt" + "testing" + + _ "go.unistack.org/micro-server-grpc/v3/proto" + "go.unistack.org/micro/v3/server" + "google.golang.org/grpc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" +) + +type reflector struct{} + +func (r *reflector) FindFileByPath(path string) (protoreflect.FileDescriptor, error) { + fd, err := protoregistry.GlobalFiles.FindFileByPath(path) + if err != nil { + fmt.Printf("err: %v\n", err) + return nil, err + } + return fd, nil +} + +func (r *reflector) FindDescriptorByName(name protoreflect.FullName) (protoreflect.Descriptor, error) { + fd, err := protoregistry.GlobalFiles.FindDescriptorByName(name) + if err != nil { + return nil, err + } + return fd, nil +} + +func (r *reflector) GetServiceInfo() map[string]grpc.ServiceInfo { + fmt.Printf("GetServiceInfo\n") + return nil +} + +func (r *reflector) FindExtensionByName(field protoreflect.FullName) (protoreflect.ExtensionType, error) { + fmt.Printf("FindExtensionByName field %#+v\n", field) + return nil, nil +} + +func (r *reflector) FindExtensionByNumber(message protoreflect.FullName, field protoreflect.FieldNumber) (protoreflect.ExtensionType, error) { + fmt.Printf("FindExtensionByNumber message %#+v field %#+v\n", message, field) + return nil, nil +} + +func (r *reflector) RangeExtensionsByMessage(message protoreflect.FullName, f func(protoreflect.ExtensionType) bool) { + fmt.Printf("RangeExtensionsByMessage\n") +} + +func TestReflector(t *testing.T) { + srv := NewServer(Reflection(&reflector{}), server.Address(":12345")) + if err := srv.Init(); err != nil { + t.Fatal(err) + } + if err := srv.Start(); err != nil { + t.Fatal(err) + } + t.Logf("addr %s", srv.Options().Address) + select {} +} diff --git a/request.go b/request.go index 29cbeb4..b007673 100644 --- a/request.go +++ b/request.go @@ -8,10 +8,7 @@ import ( "go.unistack.org/micro/v3/server" ) -var ( - _ server.Request = &rpcRequest{} - _ server.Message = &rpcMessage{} -) +var _ server.Request = &rpcRequest{} type rpcRequest struct { rw io.ReadWriter @@ -25,14 +22,6 @@ type rpcRequest struct { stream bool } -type rpcMessage struct { - payload interface{} - codec codec.Codec - header metadata.Metadata - topic string - contentType string -} - func (r *rpcRequest) ContentType() string { return r.contentType } @@ -72,23 +61,3 @@ func (r *rpcRequest) Stream() bool { func (r *rpcRequest) Body() interface{} { return r.payload } - -func (r *rpcMessage) ContentType() string { - return r.contentType -} - -func (r *rpcMessage) Topic() string { - return r.topic -} - -func (r *rpcMessage) Body() interface{} { - return r.payload -} - -func (r *rpcMessage) Header() metadata.Metadata { - return r.header -} - -func (r *rpcMessage) Codec() codec.Codec { - return r.codec -} diff --git a/subscriber.go b/subscriber.go index 677f37c..79ada2a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -7,11 +7,45 @@ import ( "strings" "go.unistack.org/micro/v3/broker" + "go.unistack.org/micro/v3/codec" + + // "go.unistack.org/micro/v3/errors" + // "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/register" "go.unistack.org/micro/v3/server" ) +var _ server.Message = &rpcMessage{} + +type rpcMessage struct { + payload interface{} + codec codec.Codec + header metadata.Metadata + topic string + contentType string +} + +func (r *rpcMessage) ContentType() string { + return r.contentType +} + +func (r *rpcMessage) Topic() string { + return r.topic +} + +func (r *rpcMessage) Body() interface{} { + return r.payload +} + +func (r *rpcMessage) Header() metadata.Metadata { + return r.header +} + +func (r *rpcMessage) Codec() codec.Codec { + return r.codec +} + type handler struct { reqType reflect.Type ctxType reflect.Type @@ -101,7 +135,6 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { return func(p broker.Event) (err error) { - msg := p.Message() // if we don't have headers, create empty map if msg.Header == nil {