fix grpc json streaming by setting content sub type (#1089)

This commit is contained in:
Asim Aslam 2020-01-07 18:37:34 +00:00 committed by GitHub
parent 1892bd05a5
commit 0b8ff3a8bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 3 deletions

View File

@ -116,10 +116,12 @@ func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
} }
func (jsonCodec) Unmarshal(data []byte, v interface{}) error { func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
if len(data) == 0 {
return nil
}
if pb, ok := v.(proto.Message); ok { if pb, ok := v.(proto.Message); ok {
return jsonpb.Unmarshal(b.NewReader(data), pb) return jsonpb.Unmarshal(b.NewReader(data), pb)
} }
return json.Unmarshal(data, v) return json.Unmarshal(data, v)
} }

View File

@ -207,7 +207,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
ServerStreams: true, ServerStreams: true,
} }
grpcCallOptions := []grpc.CallOption{} grpcCallOptions := []grpc.CallOption{grpc.CallContentSubtype(cf.Name())}
if opts := g.getGrpcCallOptions(); opts != nil { if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...) grpcCallOptions = append(grpcCallOptions, opts...)
} }

View File

@ -93,10 +93,12 @@ func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
} }
func (jsonCodec) Unmarshal(data []byte, v interface{}) error { func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
if len(data) == 0 {
return nil
}
if pb, ok := v.(proto.Message); ok { if pb, ok := v.(proto.Message); ok {
return jsonpb.Unmarshal(b.NewReader(data), pb) return jsonpb.Unmarshal(b.NewReader(data), pb)
} }
return json.Unmarshal(data, v) return json.Unmarshal(data, v)
} }

View File

@ -203,9 +203,13 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
// get content type // get content type
ct := defaultContentType ct := defaultContentType
if ctype, ok := md["x-content-type"]; ok { if ctype, ok := md["x-content-type"]; ok {
ct = ctype ct = ctype
} }
if ctype, ok := md["content-type"]; ok {
ct = ctype
}
delete(md, "x-content-type") delete(md, "x-content-type")
delete(md, "timeout") delete(md, "timeout")