correcting grpc.go

This commit is contained in:
2025-02-27 18:35:25 +03:00
parent 80a95aae8f
commit c47d04db7f
3 changed files with 71 additions and 85 deletions

14
go.mod
View File

@@ -1,6 +1,6 @@
module go.unistack.org/micro-client-grpc/v3 module go.unistack.org/micro-client-grpc/v4
go 1.22.0 go 1.23.0
require ( require (
go.unistack.org/micro/v4 v4.1.2 go.unistack.org/micro/v4 v4.1.2
@@ -13,10 +13,10 @@ require (
github.com/matoous/go-nanoid v1.5.1 // indirect github.com/matoous/go-nanoid v1.5.1 // indirect
github.com/spf13/cast v1.7.1 // indirect github.com/spf13/cast v1.7.1 // indirect
go.unistack.org/micro-proto/v4 v4.1.0 // indirect go.unistack.org/micro-proto/v4 v4.1.0 // indirect
golang.org/x/net v0.34.0 // indirect golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.29.0 // indirect golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.21.0 // indirect golang.org/x/text v0.22.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 // indirect
google.golang.org/protobuf v1.36.3 // indirect google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

20
go.sum
View File

@@ -38,18 +38,18 @@ go.unistack.org/micro-proto/v4 v4.1.0 h1:qPwL2n/oqh9RE3RTTDgt28XK3QzV597VugQPaw9
go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec= go.unistack.org/micro-proto/v4 v4.1.0/go.mod h1:ArmK7o+uFvxSY3dbJhKBBX4Pm1rhWdLEFf3LxBrMtec=
go.unistack.org/micro/v4 v4.1.2 h1:9SOlPYyPNNFpg1A7BsvhDyQm3gysLH1AhWbDCp1hyoY= go.unistack.org/micro/v4 v4.1.2 h1:9SOlPYyPNNFpg1A7BsvhDyQm3gysLH1AhWbDCp1hyoY=
go.unistack.org/micro/v4 v4.1.2/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw= go.unistack.org/micro/v4 v4.1.2/go.mod h1:lr3oYED8Ay1vjK68QqRw30QOtdk/ffpZqMFDasOUhKw=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99 h1:ZSlhAUqC4r8TPzqLXQ0m3upBNZeF+Y8jQ3c4CR3Ujms=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= google.golang.org/genproto/googleapis/rpc v0.0.0-20250224174004-546df14abb99/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

114
grpc.go
View File

@@ -5,6 +5,11 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/semconv"
"go.unistack.org/micro/v4/tracer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net" "net"
"reflect" "reflect"
"strconv" "strconv"
@@ -18,12 +23,10 @@ import (
"go.unistack.org/micro/v4/metadata" "go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/selector" "go.unistack.org/micro/v4/selector"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata" gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
) )
const ( const (
@@ -31,8 +34,6 @@ const (
) )
type grpcClient struct { type grpcClient struct {
funcPublish client.FuncPublish
funcBatchPublish client.FuncBatchPublish
funcCall client.FuncCall funcCall client.FuncCall
funcStream client.FuncStream funcStream client.FuncStream
pool *ConnPool pool *ConnPool
@@ -76,10 +77,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
var header map[string][]string var header map[string][]string
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = make(map[string][]string, len(md)) header = metadata.Copy(md)
for k, v := range md {
header[strings.ToLower(k)] = v
}
} else { } else {
header = make(map[string][]string, 2) header = make(map[string][]string, 2)
} }
@@ -89,12 +87,11 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} }
} }
// set timeout in nanoseconds // set timeout in nanoseconds
header["Grpc-Timeout"] = append(header["Grpc-Timeout"], fmt.Sprintf("%dn", opts.RequestTimeout)) header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout)) header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
header["content-type"] = append(header["content-type"], req.ContentType()) header["content-type"] = append(header["content-type"], req.ContentType())
md := gmetadata.MD(metadata.Copy(header)) ctx = gmetadata.NewOutgoingContext(ctx, header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
cf, err := g.newCodec(req.ContentType()) cf, err := g.newCodec(req.ContentType())
if err != nil { if err != nil {
@@ -180,7 +177,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
if opts.ResponseMetadata != nil { if opts.ResponseMetadata != nil {
*opts.ResponseMetadata = metadata.New(gmd.Len()) *opts.ResponseMetadata = metadata.New(gmd.Len())
for k, v := range gmd { for k, v := range gmd {
opts.ResponseMetadata.Set(k, strings.Join(v, ",")) opts.ResponseMetadata.Append(k, v...)
} }
} }
@@ -191,24 +188,20 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
var header map[string][]string var header map[string][]string
if md, ok := metadata.FromOutgoingContext(ctx); ok { if md, ok := metadata.FromOutgoingContext(ctx); ok {
header = make(map[string][]string, len(md)) header = metadata.Copy(md)
for k, v := range md {
header[k] = v
}
} else { } else {
header = make(map[string][]string) header = make(map[string][]string)
} }
// set timeout in nanoseconds // set timeout in nanoseconds
if opts.StreamTimeout > time.Duration(0) { if opts.StreamTimeout > time.Duration(0) {
header["Grpc-Timeout"] = append(header["Grpc-Timeout"], fmt.Sprintf("%dn", opts.RequestTimeout)) header["grpc-timeout"] = append(header["grpc-timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout)) header["timeout"] = append(header["timeout"], fmt.Sprintf("%dn", opts.RequestTimeout))
} }
// set the content type for the request // set the content type for the request
header["content-type"] = append(header["content-type"], req.ContentType()) header["content-type"] = append(header["content-type"], req.ContentType())
md := gmetadata.MD(metadata.Copy(header)) ctx = gmetadata.NewOutgoingContext(ctx, header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
cf, err := g.newCodec(req.ContentType()) cf, err := g.newCodec(req.ContentType())
if err != nil { if err != nil {
@@ -413,8 +406,6 @@ func (g *grpcClient) Init(opts ...client.Option) error {
g.funcCall = g.fnCall g.funcCall = g.fnCall
g.funcStream = g.fnStream g.funcStream = g.fnStream
g.funcPublish = g.fnPublish
g.funcBatchPublish = g.fnBatchPublish
g.opts.Hooks.EachPrev(func(hook options.Hook) { g.opts.Hooks.EachPrev(func(hook options.Hook) {
switch h := hook.(type) { switch h := hook.(type) {
@@ -422,12 +413,9 @@ func (g *grpcClient) Init(opts ...client.Option) error {
g.funcCall = h(g.funcCall) g.funcCall = h(g.funcCall)
case client.HookStream: case client.HookStream:
g.funcStream = h(g.funcStream) g.funcStream = h(g.funcStream)
case client.HookPublish:
g.funcPublish = h(g.funcPublish)
case client.HookBatchPublish:
g.funcBatchPublish = h(g.funcBatchPublish)
} }
}) })
g.init = true
return nil return nil
} }
@@ -440,37 +428,6 @@ func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...) return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
} }
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
if req == nil {
return errors.InternalServerError("go.micro.client", "req is nil")
} else if rsp == nil {
return errors.InternalServerError("go.micro.client", "rsp is nil")
}
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
err := g.funcCall(ctx, req, rsp, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := errors.FromError(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
}
return err
}
func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts // make a copy of call opts
callOpts := g.opts.CallOptions callOpts := g.opts.CallOptions
@@ -601,7 +558,13 @@ func (g *grpcClient) fnCall(ctx context.Context, req client.Request, rsp interfa
return gerr return gerr
} }
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
if req == nil {
return errors.InternalServerError("go.micro.client", "req is nil")
} else if rsp == nil {
return errors.InternalServerError("go.micro.client", "rsp is nil")
}
ts := time.Now() ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span var sp tracer.Span
@@ -609,21 +572,21 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
tracer.WithSpanKind(tracer.SpanKindClient), tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()), tracer.WithSpanLabels("endpoint", req.Endpoint()),
) )
stream, err := g.funcStream(ctx, req, opts...) err := g.funcCall(ctx, req, rsp, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec() g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts) te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds()) g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds()) g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := status.Convert(err); me == nil { if me := errors.FromError(err); me == nil {
sp.Finish() sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc() g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc()
} else { } else {
sp.SetStatus(tracer.SpanStatusError, err.Error()) sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc() g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc()
} }
return stream, err return err
} }
func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
@@ -754,6 +717,31 @@ func (g *grpcClient) fnStream(ctx context.Context, req client.Request, opts ...c
return nil, grr return nil, grr
} }
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ts := time.Now()
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc()
var sp tracer.Span
ctx, sp = g.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client",
tracer.WithSpanKind(tracer.SpanKindClient),
tracer.WithSpanLabels("endpoint", req.Endpoint()),
)
stream, err := g.funcStream(ctx, req, opts...)
g.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec()
te := time.Since(ts)
g.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds())
g.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds())
if me := status.Convert(err); me == nil {
sp.Finish()
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
} else {
sp.SetStatus(tracer.SpanStatusError, err.Error())
g.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code()))).Inc()
}
return stream, err
}
func (g *grpcClient) String() string { func (g *grpcClient) String() string {
return "grpc" return "grpc"
} }
@@ -833,8 +821,6 @@ func NewClient(opts ...client.Option) client.Client {
c.funcCall = c.fnCall c.funcCall = c.fnCall
c.funcStream = c.fnStream c.funcStream = c.fnStream
c.funcPublish = c.fnPublish
c.funcBatchPublish = c.fnBatchPublish
return c return c
} }