support response metadata

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2022-11-16 23:10:27 +03:00
parent 0f32fad4c0
commit bd7dbe94ca
6 changed files with 85 additions and 66 deletions

112
grpc.go
View File

@@ -78,10 +78,15 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
} else {
header = make(map[string]string, 2)
}
if opts.RequestMetadata != nil {
for k, v := range opts.RequestMetadata {
header[k] = v
}
}
// set timeout in nanoseconds
header["Grpc-Timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header["timeout"] = fmt.Sprintf("%dn", opts.RequestTimeout)
header["content-type"] = req.ContentType()
md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
@@ -113,7 +118,7 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
),
}
if opts := g.getGrpcDialOptions(); opts != nil {
if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
@@ -127,15 +132,22 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
}()
ch := make(chan error, 1)
var gmd gmetadata.MD
grpcCallOptions := []grpc.CallOption{
grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name()),
}
if opts := g.getGrpcCallOptions(opts.Context); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
if opts.ResponseMetadata != nil {
gmd = gmetadata.MD{}
grpcCallOptions = append(grpcCallOptions, grpc.Header(&gmd))
}
go func() {
grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(&wrapMicroCodec{cf}),
grpc.CallContentSubtype((&wrapMicroCodec{cf}).Name()),
}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpcCallOptions...)
ch <- microError(err)
}()
@@ -147,6 +159,13 @@ func (g *grpcClient) call(ctx context.Context, addr string, req client.Request,
grr = errors.Timeout("go.micro.client", "%v", ctx.Err())
}
if opts.ResponseMetadata != nil {
*opts.ResponseMetadata = metadata.New(gmd.Len())
for k, v := range gmd {
opts.ResponseMetadata.Set(k, strings.Join(v, ","))
}
}
return grr
}
@@ -168,7 +187,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
header["timeout"] = fmt.Sprintf("%dn", opts.StreamTimeout)
}
// set the content type for the request
header["x-content-type"] = req.ContentType()
header["content-type"] = req.ContentType()
md := gmetadata.New(header)
ctx = gmetadata.NewOutgoingContext(ctx, md)
@@ -200,7 +219,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
),
}
if opts := g.getGrpcDialOptions(); opts != nil {
if opts := g.getGrpcDialOptions(opts.Context); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
@@ -216,12 +235,17 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
}
grpcCallOptions := []grpc.CallOption{
grpc.ForceCodec(wc),
// grpc.ForceCodec(wc),
grpc.CallContentSubtype(wc.Name()),
}
if opts := g.getGrpcCallOptions(); opts != nil {
if opts := g.getGrpcCallOptions(opts.Context); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
var gmd gmetadata.MD
if opts.ResponseMetadata != nil {
gmd = gmetadata.MD{}
grpcCallOptions = append(grpcCallOptions, grpc.Header(&gmd))
}
// create a new cancelling context
newCtx, cancel := context.WithCancel(ctx)
@@ -267,6 +291,7 @@ func (g *grpcClient) stream(ctx context.Context, addr string, req client.Request
// set the stream as the response
val := reflect.ValueOf(rsp).Elem()
val.Set(reflect.ValueOf(stream).Elem())
return nil
}
@@ -391,6 +416,7 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
}
// make a copy of call opts
callOpts := g.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
@@ -728,41 +754,45 @@ func (g *grpcClient) Name() string {
return g.opts.Name
}
func (g *grpcClient) getGrpcDialOptions() []grpc.DialOption {
if g.opts.CallOptions.Context == nil {
return nil
func (g *grpcClient) getGrpcDialOptions(ctx context.Context) []grpc.DialOption {
var opts []grpc.DialOption
if g.opts.CallOptions.Context != nil {
if v := g.opts.CallOptions.Context.Value(grpcDialOptions{}); v != nil {
if vopts, ok := v.([]grpc.DialOption); ok {
opts = append(opts, vopts...)
}
}
}
v := g.opts.CallOptions.Context.Value(grpcDialOptions{})
if v == nil {
return nil
}
opts, ok := v.([]grpc.DialOption)
if !ok {
return nil
if ctx != nil {
if v := ctx.Value(grpcDialOptions{}); v != nil {
if vopts, ok := v.([]grpc.DialOption); ok {
opts = append(opts, vopts...)
}
}
}
return opts
}
func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
if g.opts.CallOptions.Context == nil {
return nil
func (g *grpcClient) getGrpcCallOptions(ctx context.Context) []grpc.CallOption {
var opts []grpc.CallOption
if g.opts.CallOptions.Context != nil {
if v := g.opts.CallOptions.Context.Value(grpcCallOptions{}); v != nil {
if vopts, ok := v.([]grpc.CallOption); ok {
opts = append(opts, vopts...)
}
}
}
v := g.opts.CallOptions.Context.Value(grpcCallOptions{})
if v == nil {
return nil
}
opts, ok := v.([]grpc.CallOption)
if !ok {
return nil
if ctx != nil {
if v := ctx.Value(grpcCallOptions{}); v != nil {
if vopts, ok := v.([]grpc.CallOption); ok {
opts = append(opts, vopts...)
}
}
}
return opts
@@ -771,7 +801,9 @@ func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
func NewClient(opts ...client.Option) client.Client {
options := client.NewOptions(opts...)
// default content type for grpc
options.ContentType = DefaultContentType
if options.ContentType == "" {
options.ContentType = DefaultContentType
}
rc := &grpcClient{
opts: options,