Evstigneev Denis
38c5fe8b5a
## Pull Request template Please, go through these steps before clicking submit on this PR. 1. Give a descriptive title to your PR. 2. Provide a description of your changes. 3. Make sure you have some relevant tests. 4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable). **PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING** Reviewed-on: #369 Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru> Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
272 lines
6.2 KiB
Go
272 lines
6.2 KiB
Go
package grpc_util
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.unistack.org/micro/v3/tracer"
|
|
grpc_codes "google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/peer"
|
|
"google.golang.org/grpc/stats"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
type gRPCContextKey struct{}
|
|
|
|
type gRPCContext struct {
|
|
messagesReceived int64
|
|
messagesSent int64
|
|
}
|
|
|
|
type Options struct {
|
|
Tracer tracer.Tracer
|
|
}
|
|
|
|
type Option func(*Options)
|
|
|
|
func Tracer(tr tracer.Tracer) Option {
|
|
return func(o *Options) {
|
|
o.Tracer = tr
|
|
}
|
|
}
|
|
|
|
// NewServerHandler creates a stats.Handler for gRPC server.
|
|
func NewServerHandler(opts ...Option) stats.Handler {
|
|
options := Options{Tracer: tracer.DefaultTracer}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
h := &serverHandler{
|
|
opts: options,
|
|
}
|
|
return h
|
|
}
|
|
|
|
type serverHandler struct {
|
|
opts Options
|
|
}
|
|
|
|
// TagRPC can attach some information to the given context.
|
|
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
|
|
name, attrs := parseFullMethod(info.FullMethodName)
|
|
attrs = append(attrs, "rpc.system", "grpc")
|
|
ctx, _ = h.opts.Tracer.Start(
|
|
ctx,
|
|
name,
|
|
tracer.WithSpanKind(tracer.SpanKindServer),
|
|
tracer.WithSpanLabels(attrs...),
|
|
)
|
|
|
|
gctx := gRPCContext{}
|
|
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
|
|
}
|
|
|
|
// HandleRPC processes the RPC stats.
|
|
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
|
handleRPC(ctx, rs)
|
|
}
|
|
|
|
// TagConn can attach some information to the given context.
|
|
func (h *serverHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
|
|
if span, ok := tracer.SpanFromContext(ctx); ok {
|
|
attrs := peerAttr(peerFromCtx(ctx))
|
|
span.AddLabels(attrs...)
|
|
}
|
|
return ctx
|
|
}
|
|
|
|
// HandleConn processes the Conn stats.
|
|
func (h *serverHandler) HandleConn(_ context.Context, _ stats.ConnStats) {
|
|
}
|
|
|
|
type clientHandler struct {
|
|
opts Options
|
|
}
|
|
|
|
// NewClientHandler creates a stats.Handler for gRPC client.
|
|
func NewClientHandler(opts ...Option) stats.Handler {
|
|
options := Options{Tracer: tracer.DefaultTracer}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
h := &clientHandler{
|
|
opts: options,
|
|
}
|
|
return h
|
|
}
|
|
|
|
// TagRPC can attach some information to the given context.
|
|
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
|
|
name, attrs := parseFullMethod(info.FullMethodName)
|
|
attrs = append(attrs, "rpc.system", "grpc", "rpc.flavor", "grpc", "rpc.call", info.FullMethodName)
|
|
ctx, _ = h.opts.Tracer.Start(
|
|
ctx,
|
|
name,
|
|
tracer.WithSpanKind(tracer.SpanKindClient),
|
|
tracer.WithSpanLabels(attrs...),
|
|
)
|
|
|
|
gctx := gRPCContext{}
|
|
|
|
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
|
|
}
|
|
|
|
// HandleRPC processes the RPC stats.
|
|
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
|
|
handleRPC(ctx, rs)
|
|
}
|
|
|
|
// TagConn can attach some information to the given context.
|
|
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
|
|
// TODO
|
|
if span, ok := tracer.SpanFromContext(ctx); ok {
|
|
attrs := peerAttr(cti.RemoteAddr.String())
|
|
span.AddLabels(attrs...)
|
|
}
|
|
return ctx
|
|
}
|
|
|
|
// HandleConn processes the Conn stats.
|
|
func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
|
|
// no-op
|
|
}
|
|
|
|
func handleRPC(ctx context.Context, rs stats.RPCStats) {
|
|
span, ok := tracer.SpanFromContext(ctx)
|
|
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
|
|
var messageID int64
|
|
if rs.IsClient() {
|
|
span.AddLabels("span.kind", "client")
|
|
} else {
|
|
span.AddLabels("span.kind", "server")
|
|
}
|
|
|
|
switch rs := rs.(type) {
|
|
case *stats.Begin:
|
|
if rs.IsClientStream || rs.IsServerStream {
|
|
span.AddLabels("rpc.call_type", "stream")
|
|
} else {
|
|
span.AddLabels("rpc.call_type", "unary")
|
|
}
|
|
span.AddEvent("message",
|
|
tracer.WithEventLabels(
|
|
"message.begin_time", rs.BeginTime.Format(time.RFC3339),
|
|
),
|
|
)
|
|
case *stats.InPayload:
|
|
if gctx != nil {
|
|
messageID = atomic.AddInt64(&gctx.messagesReceived, 1)
|
|
}
|
|
if ok {
|
|
span.AddEvent("message",
|
|
tracer.WithEventLabels(
|
|
"message.recv_time", rs.RecvTime.Format(time.RFC3339),
|
|
"message.type", "RECEIVED",
|
|
"message.id", messageID,
|
|
"message.compressed_size", rs.CompressedLength,
|
|
"message.uncompressed_size", rs.Length,
|
|
),
|
|
)
|
|
}
|
|
case *stats.OutPayload:
|
|
if gctx != nil {
|
|
messageID = atomic.AddInt64(&gctx.messagesSent, 1)
|
|
}
|
|
if ok {
|
|
span.AddEvent("message",
|
|
tracer.WithEventLabels(
|
|
"message.sent_time", rs.SentTime.Format(time.RFC3339),
|
|
"message.type", "SENT",
|
|
"message.id", messageID,
|
|
"message.compressed_size", rs.CompressedLength,
|
|
"message.uncompressed_size", rs.Length,
|
|
),
|
|
)
|
|
}
|
|
case *stats.End:
|
|
if ok {
|
|
span.AddEvent("message",
|
|
tracer.WithEventLabels(
|
|
"message.begin_time", rs.BeginTime.Format(time.RFC3339),
|
|
"message.end_time", rs.EndTime.Format(time.RFC3339),
|
|
),
|
|
)
|
|
if rs.Error != nil {
|
|
s, _ := status.FromError(rs.Error)
|
|
span.SetStatus(tracer.SpanStatusError, s.Message())
|
|
span.AddLabels("rpc.grpc.status_code", s.Code())
|
|
} else {
|
|
span.AddLabels("rpc.grpc.status_code", grpc_codes.OK)
|
|
}
|
|
span.Finish()
|
|
}
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
|
|
func parseFullMethod(fullMethod string) (string, []interface{}) {
|
|
if !strings.HasPrefix(fullMethod, "/") {
|
|
// Invalid format, does not follow `/package.service/method`.
|
|
return fullMethod, nil
|
|
}
|
|
name := fullMethod[1:]
|
|
pos := strings.LastIndex(name, "/")
|
|
if pos < 0 {
|
|
// Invalid format, does not follow `/package.service/method`.
|
|
return name, nil
|
|
}
|
|
service, method := name[:pos], name[pos+1:]
|
|
|
|
var attrs []interface{}
|
|
if service != "" {
|
|
attrs = append(attrs, "rpc.service", service)
|
|
}
|
|
if method != "" {
|
|
attrs = append(attrs, "rpc.method", method)
|
|
}
|
|
return name, attrs
|
|
}
|
|
|
|
func peerAttr(addr string) []interface{} {
|
|
host, p, err := net.SplitHostPort(addr)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if host == "" {
|
|
host = "127.0.0.1"
|
|
}
|
|
port, err := strconv.Atoi(p)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
var attr []interface{}
|
|
if ip := net.ParseIP(host); ip != nil {
|
|
attr = []interface{}{
|
|
"net.sock.peer.addr", host,
|
|
"net.sock.peer.port", port,
|
|
}
|
|
} else {
|
|
attr = []interface{}{
|
|
"net.peer.name", host,
|
|
"net.peer.port", port,
|
|
}
|
|
}
|
|
|
|
return attr
|
|
}
|
|
|
|
func peerFromCtx(ctx context.Context) string {
|
|
p, ok := peer.FromContext(ctx)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return p.Addr.String()
|
|
}
|