From bd7dbe94caa9654e649b45dece903c465e8aba69 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 16 Nov 2022 23:10:27 +0300 Subject: [PATCH] support response metadata Signed-off-by: Vasiliy Tolstov --- go.mod | 3 +- go.sum | 13 +++--- grpc.go | 112 +++++++++++++++++++++++++++++++++------------------- options.go | 8 ---- request.go | 13 +++--- response.go | 2 +- 6 files changed, 85 insertions(+), 66 deletions(-) diff --git a/go.mod b/go.mod index 60ab7d9..fd1f498 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ module go.unistack.org/micro-client-grpc/v3 go 1.16 require ( - github.com/google/gnostic v0.6.9 // indirect - go.unistack.org/micro/v3 v3.9.11 + go.unistack.org/micro/v3 v3.9.13 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect golang.org/x/sys v0.0.0-20220429233432-b5fbb4746d32 // indirect google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e // indirect diff --git a/go.sum b/go.sum index ab766f8..23d1bba 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,6 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/gnostic v0.6.6/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= -github.com/google/gnostic v0.6.8/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= github.com/google/gnostic v0.6.9 h1:ZK/5VhkoX835RikCHpSUJV9a+S3e1zLh59YnyWeBW+0= github.com/google/gnostic v0.6.9/go.mod h1:Nm8234We1lq6iB9OmlgNv3nH91XLLVZHCDayfA3xq+E= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -84,10 +82,10 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2 github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= -go.unistack.org/micro-proto/v3 v3.2.7 h1:zG6d69kHc+oij2lwQ3AfrCgdjiEVRG2A7TlsxjusWs4= -go.unistack.org/micro-proto/v3 v3.2.7/go.mod h1:ZltVWNECD5yK+40+OCONzGw4OtmSdTpVi8/KFgo9dqM= -go.unistack.org/micro/v3 v3.9.11 h1:dsZVss3nvfByL1ZDJNnUVQB1N8w6qn4pr9vIkeBiii8= -go.unistack.org/micro/v3 v3.9.11/go.mod h1:LrBm9Fsf4MbY8DlUbdwRTYJosMkN8wrtOQHoYBlMkz8= +go.unistack.org/micro-proto/v3 v3.3.1 h1:nQ0MtWvP2G3QrpOgawVOPhpZZYkq6umTGDqs8FxJYIo= +go.unistack.org/micro-proto/v3 v3.3.1/go.mod h1:cwRyv8uInM2I7EbU7O8Fx2Ls3N90Uw9UCCcq4olOdfE= +go.unistack.org/micro/v3 v3.9.13 h1:x1/H4rDgvz2JWXx6UvFiWDyRLxg2Kpp6V1kjzpLlyvo= +go.unistack.org/micro/v3 v3.9.13/go.mod h1:gI4RkJKHLPW7KV6h4+ZBOZD997MRvFRXMPQIHpozikI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -173,8 +171,9 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/grpc.go b/grpc.go index f9543b0..98aa30c 100644 --- a/grpc.go +++ b/grpc.go @@ -78,10 +78,15 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, } else { header = make(map[string]string, 2) } - + if opts.RequestMetadata != nil { + for k, v := range opts.RequestMetadata { + header[k] = v + } + } // set timeout in nanoseconds header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout) header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout) + header["content-type"] = req.ContentType() md := gmetadata.New(header) ctx = gmetadata.NewOutgoingContext(ctx, md) @@ -113,7 +118,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, ), } - if opts := g.getGrpcDialOptions(); opts != nil { + if opts := g.getGrpcDialOptions(opts.Context); opts != nil { grpcDialOptions = append(grpcDialOptions, opts...) } @@ -127,15 +132,22 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, }() ch := make(chan error, 1) + var gmd gmetadata.MD + + grpcCallOptions := []grpc.CallOption{ + grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name()), + } + + if opts := g.getGrpcCallOptions(opts.Context); opts != nil { + grpcCallOptions = append(grpcCallOptions, opts...) + } + + if opts.ResponseMetadata != nil { + gmd = gmetadata.MD{} + grpcCallOptions = append(grpcCallOptions, grpc.Header(&gmd)) + } go func() { - grpcCallOptions := []grpc.CallOption{ - grpc.ForceCodec(&wrapMicroCodec{cf}), - grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name()), - } - if opts := g.getGrpcCallOptions(); opts != nil { - grpcCallOptions = append(grpcCallOptions, opts...) - } err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpcCallOptions...) ch <- microError(err) }() @@ -147,6 +159,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request, grr = errors.Timeout("go.micro.client", "%v", ctx.Err()) } + if opts.ResponseMetadata != nil { + *opts.ResponseMetadata = metadata.New(gmd.Len()) + for k, v := range gmd { + opts.ResponseMetadata.Set(k, strings.Join(v, ",")) + } + } + return grr } @@ -168,7 +187,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout) } // set the content type for the request - header["x-content-type"] = req.ContentType() + header["content-type"] = req.ContentType() md := gmetadata.New(header) ctx = gmetadata.NewOutgoingContext(ctx, md) @@ -200,7 +219,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request ), } - if opts := g.getGrpcDialOptions(); opts != nil { + if opts := g.getGrpcDialOptions(opts.Context); opts != nil { grpcDialOptions = append(grpcDialOptions, opts...) } @@ -216,12 +235,17 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request } grpcCallOptions := []grpc.CallOption{ - grpc.ForceCodec(wc), + // grpc.ForceCodec(wc), grpc.CallContentSubtype(wc.Name()), } - if opts := g.getGrpcCallOptions(); opts != nil { + if opts := g.getGrpcCallOptions(opts.Context); opts != nil { grpcCallOptions = append(grpcCallOptions, opts...) } + var gmd gmetadata.MD + if opts.ResponseMetadata != nil { + gmd = gmetadata.MD{} + grpcCallOptions = append(grpcCallOptions, grpc.Header(&gmd)) + } // create a new cancelling context newCtx, cancel := context.WithCancel(ctx) @@ -267,6 +291,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request // set the stream as the response val := reflect.ValueOf(rsp).Elem() val.Set(reflect.ValueOf(stream).Elem()) + return nil } @@ -391,6 +416,7 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface } // make a copy of call opts callOpts := g.opts.CallOptions + for _, opt := range opts { opt(&callOpts) } @@ -728,41 +754,45 @@ func (g *grpcClient) Name() string { return g.opts.Name } -func (g *grpcClient) getGrpcDialOptions() []grpc.DialOption { - if g.opts.CallOptions.Context == nil { - return nil +func (g *grpcClient) getGrpcDialOptions(ctx context.Context) []grpc.DialOption { + var opts []grpc.DialOption + + if g.opts.CallOptions.Context != nil { + if v := g.opts.CallOptions.Context.Value(grpcDialOptions{}); v != nil { + if vopts, ok := v.([]grpc.DialOption); ok { + opts = append(opts, vopts...) + } + } } - v := g.opts.CallOptions.Context.Value(grpcDialOptions{}) - - if v == nil { - return nil - } - - opts, ok := v.([]grpc.DialOption) - - if !ok { - return nil + if ctx != nil { + if v := ctx.Value(grpcDialOptions{}); v != nil { + if vopts, ok := v.([]grpc.DialOption); ok { + opts = append(opts, vopts...) + } + } } return opts } -func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption { - if g.opts.CallOptions.Context == nil { - return nil +func (g *grpcClient) getGrpcCallOptions(ctx context.Context) []grpc.CallOption { + var opts []grpc.CallOption + + if g.opts.CallOptions.Context != nil { + if v := g.opts.CallOptions.Context.Value(grpcCallOptions{}); v != nil { + if vopts, ok := v.([]grpc.CallOption); ok { + opts = append(opts, vopts...) + } + } } - v := g.opts.CallOptions.Context.Value(grpcCallOptions{}) - - if v == nil { - return nil - } - - opts, ok := v.([]grpc.CallOption) - - if !ok { - return nil + if ctx != nil { + if v := ctx.Value(grpcCallOptions{}); v != nil { + if vopts, ok := v.([]grpc.CallOption); ok { + opts = append(opts, vopts...) + } + } } return opts @@ -771,7 +801,9 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption { func NewClient(opts ...client.Option) client.Client { options := client.NewOptions(opts...) // default content type for grpc - options.ContentType = DefaultContentType + if options.ContentType == "" { + options.ContentType = DefaultContentType + } rc := &grpcClient{ opts: options, diff --git a/options.go b/options.go index d64c43d..30fc4a6 100644 --- a/options.go +++ b/options.go @@ -70,9 +70,7 @@ func Codec(contentType string, c encoding.Codec) client.Option { type maxRecvMsgSizeKey struct{} -// // MaxRecvMsgSize set the maximum size of message that client can receive. -// func MaxRecvMsgSize(s int) client.Option { return func(o *client.Options) { if o.Context == nil { @@ -84,9 +82,7 @@ func MaxRecvMsgSize(s int) client.Option { type maxSendMsgSizeKey struct{} -// // MaxSendMsgSize set the maximum size of message that client can send. -// func MaxSendMsgSize(s int) client.Option { return func(o *client.Options) { if o.Context == nil { @@ -98,9 +94,7 @@ func MaxSendMsgSize(s int) client.Option { type grpcDialOptions struct{} -// // DialOptions to be used to configure gRPC dial options -// func DialOptions(opts ...grpc.DialOption) client.CallOption { return func(o *client.CallOptions) { if o.Context == nil { @@ -112,9 +106,7 @@ func DialOptions(opts ...grpc.DialOption) client.CallOption { type grpcCallOptions struct{} -// // CallOptions to be used to configure gRPC call options -// func CallOptions(opts ...grpc.CallOption) client.CallOption { return func(o *client.CallOptions) { if o.Context == nil { diff --git a/request.go b/request.go index 84d1bad..a0f808e 100644 --- a/request.go +++ b/request.go @@ -38,15 +38,12 @@ func methodToGRPC(service, method string) string { return fmt.Sprintf("/%s.%s/%s", service, mParts[0], mParts[1]) } -func newGRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { - var opts client.RequestOptions - for _, o := range reqOpts { - o(&opts) - } +func newGRPCRequest(service, method string, request interface{}, contentType string, opts ...client.RequestOption) client.Request { + options := client.NewRequestOptions(opts...) // set the content-type specified - if len(opts.ContentType) > 0 { - contentType = opts.ContentType + if len(options.ContentType) > 0 { + contentType = options.ContentType } return &grpcRequest{ @@ -54,7 +51,7 @@ func newGRPCRequest(service, method string, request interface{}, contentType str method: method, request: request, contentType: contentType, - opts: opts, + opts: options, } } diff --git a/response.go b/response.go index eb94c66..5fbda87 100644 --- a/response.go +++ b/response.go @@ -23,7 +23,7 @@ func (r *response) Codec() codec.Codec { func (r *response) Header() metadata.Metadata { meta, err := r.stream.Header() if err != nil { - return metadata.New(0) + return nil } md := metadata.New(len(meta)) for k, v := range meta {