add metrics and tracing
Some checks failed
build / test (push) Failing after 1m53s
build / lint (push) Successful in 9m16s
codeql / analyze (go) (push) Failing after 3m13s

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2024-04-23 08:01:44 +03:00
parent e6e64ff070
commit eecc3854b2
4 changed files with 99 additions and 125 deletions

125
grpc.go
View File

@@ -24,10 +24,12 @@ import (
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/options"
"go.unistack.org/micro/v3/register"
"go.unistack.org/micro/v3/semconv"
"go.unistack.org/micro/v3/server"
msync "go.unistack.org/micro/v3/sync"
"go.unistack.org/micro/v3/tracer"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -106,25 +108,6 @@ func (g *Server) configure(opts ...server.Option) error {
o(&g.opts)
}
if err := g.opts.Register.Init(); err != nil {
return err
}
if err := g.opts.Broker.Init(); err != nil {
return err
}
if err := g.opts.Tracer.Init(); err != nil {
return err
}
if err := g.opts.Logger.Init(); err != nil {
return err
}
if err := g.opts.Meter.Init(); err != nil {
return err
}
if err := g.opts.Transport.Init(); err != nil {
return err
}
if g.opts.Context != nil {
if codecs, ok := g.opts.Context.Value(codecsKey{}).(map[string]encoding.Codec); ok && codecs != nil {
for k, v := range codecs {
@@ -207,13 +190,25 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption {
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
var err error
ctx := stream.Context()
fullMethod, ok := grpc.MethodFromServerStream(stream)
if !ok {
return status.Errorf(codes.Internal, "method does not exist in context")
}
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, fullMethod+" rpc-server",
tracer.WithSpanKind(tracer.SpanKindServer),
tracer.WithSpanLabels(
"endpoint", fullMethod,
),
)
ts := time.Now()
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc()
defer func() {
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds())
@@ -224,8 +219,10 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
if st == nil || st.Code() == codes.OK {
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc()
}
sp.Finish()
}()
var serviceName, methodName string
@@ -241,7 +238,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
}
// get grpc metadata
gmd, ok := gmetadata.FromIncomingContext(stream.Context())
gmd, ok := gmetadata.FromIncomingContext(ctx)
if !ok {
gmd = gmetadata.MD{}
}
@@ -291,10 +288,10 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
}
// create new context
ctx := metadata.NewIncomingContext(stream.Context(), md)
ctx = metadata.NewIncomingContext(ctx, md)
// get peer from context
if p, ok := peer.FromContext(stream.Context()); ok {
if p, ok := peer.FromContext(ctx); ok {
md.Set("Remote", p.Addr.String())
ctx = peer.NewContext(ctx, p)
}
@@ -392,10 +389,11 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
return err
}
// wrap the handler func
for i := len(g.opts.HdlrWrappers); i > 0; i-- {
fn = g.opts.HdlrWrappers[i-1](fn)
}
g.opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
}
})
statusCode := codes.OK
statusDesc := ""
@@ -428,7 +426,7 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Warn(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
config.Logger.Error(config.Context, "handler error will not be transferred properly, must return *errors.Error or proto.Message")
}
// default case user pass own error type that not proto based
statusCode = convertCode(verr)
@@ -475,9 +473,11 @@ func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, se
return nil
}
for i := len(opts.HdlrWrappers); i > 0; i-- {
fn = opts.HdlrWrappers[i-1](fn)
}
opts.Hooks.EachNext(func(hook options.Hook) {
if h, ok := hook.(server.HookHandler); ok {
fn = h(fn)
}
})
statusCode := codes.OK
statusDesc := ""
@@ -645,7 +645,7 @@ func (g *Server) Register() error {
if !registered {
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID)
config.Logger.Info(config.Context, fmt.Sprintf("Register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].ID))
}
}
@@ -681,7 +681,7 @@ func (g *Server) Deregister() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Deregistering node: %s", service.Nodes[0].ID)
config.Logger.Info(config.Context, "Deregistering node: "+service.Nodes[0].ID)
}
if err := server.DefaultDeregisterFunc(service, config); err != nil {
@@ -705,11 +705,11 @@ func (g *Server) Deregister() error {
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", s.Topic())
config.Logger.Info(config.Context, "Unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(g.opts.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Unsubscribing from topic: %s err: %v", s.Topic(), err)
config.Logger.Error(config.Context, "Unsubscribing from topic: "+s.Topic(), err)
}
}
}(sub)
@@ -756,7 +756,7 @@ func (g *Server) Start() error {
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Server [grpc] Listening on %s", ts.Addr().String())
config.Logger.Info(config.Context, "Server [grpc] Listening on "+ts.Addr().String())
}
g.Lock()
g.opts.Address = ts.Addr().String()
@@ -770,13 +770,13 @@ func (g *Server) Start() error {
// connect to the broker
if err = config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Broker [%s] connect error: %v", config.Broker.String(), err)
config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
@@ -784,13 +784,13 @@ func (g *Server) Start() error {
// nolint: nestif
if err = g.opts.RegisterCheck(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), err)
}
} else {
// announce self to the world
if err = g.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server register error: %v", err)
config.Logger.Error(config.Context, "Server register error", err)
}
}
}
@@ -803,11 +803,11 @@ func (g *Server) Start() error {
go func() {
if err = g.srv.Serve(ts); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "gRPC Server start error: %v", err)
config.Logger.Error(config.Context, "gRPC Server start error", err)
}
if err = g.Stop(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "gRPC Server stop error: %v", err)
config.Logger.Error(config.Context, "gRPC Server stop error", err)
}
}
}
@@ -837,23 +837,23 @@ func (g *Server) Start() error {
// nolint: nestif
if rerr != nil && registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s, deregister it", config.Name, config.ID, rerr)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error, deregister it", config.Name, config.ID), rerr)
}
// deregister self in case of error
if err = g.Deregister(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s deregister error: %s", config.Name, config.ID, err)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s deregister error", config.Name, config.ID), err)
}
}
} else if rerr != nil && !registered {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, rerr)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register check error", config.Name, config.ID), rerr)
}
continue
}
if err = g.Register(); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Server %s-%s register error: %s", config.Name, config.ID, err)
config.Logger.Error(config.Context, fmt.Sprintf("Server %s-%s register error", config.Name, config.ID), err)
}
}
// wait for exit
@@ -892,12 +892,12 @@ func (g *Server) Start() error {
ch <- nil
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
config.Logger.Info(config.Context, fmt.Sprintf("broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()))
}
// disconnect broker
if err = config.Broker.Disconnect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "Broker [%s] disconnect error: %v", config.Broker.String(), err)
config.Logger.Error(config.Context, fmt.Sprintf("broker [%s] disconnect error", config.Broker.String()), err)
}
}
}()
@@ -910,37 +910,6 @@ func (g *Server) Start() error {
return nil
}
func (g *Server) subscribe() error {
config := g.opts
for sb := range g.subscribers {
handler := g.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
subCtx := config.Context
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts = append(opts, broker.SubscribeContext(subCtx))
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
g.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (g *Server) Stop() error {
g.RLock()
if !g.started {