diff --git a/client/grpc/codec.go b/client/grpc/codec.go index 44f16d54..70ef0b99 100644 --- a/client/grpc/codec.go +++ b/client/grpc/codec.go @@ -116,10 +116,12 @@ func (jsonCodec) Marshal(v interface{}) ([]byte, error) { } func (jsonCodec) Unmarshal(data []byte, v interface{}) error { + if len(data) == 0 { + return nil + } if pb, ok := v.(proto.Message); ok { return jsonpb.Unmarshal(b.NewReader(data), pb) } - return json.Unmarshal(data, v) } diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index e8e23a36..3d158ea0 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -207,7 +207,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client ServerStreams: true, } - grpcCallOptions := []grpc.CallOption{} + grpcCallOptions := []grpc.CallOption{grpc.CallContentSubtype(cf.Name())} if opts := g.getGrpcCallOptions(); opts != nil { grpcCallOptions = append(grpcCallOptions, opts...) } diff --git a/server/grpc/codec.go b/server/grpc/codec.go index 8de1b8bf..dff9f600 100644 --- a/server/grpc/codec.go +++ b/server/grpc/codec.go @@ -93,10 +93,12 @@ func (jsonCodec) Marshal(v interface{}) ([]byte, error) { } func (jsonCodec) Unmarshal(data []byte, v interface{}) error { + if len(data) == 0 { + return nil + } if pb, ok := v.(proto.Message); ok { return jsonpb.Unmarshal(b.NewReader(data), pb) } - return json.Unmarshal(data, v) } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index 1113f842..874f7273 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -203,9 +203,13 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { // get content type ct := defaultContentType + if ctype, ok := md["x-content-type"]; ok { ct = ctype } + if ctype, ok := md["content-type"]; ok { + ct = ctype + } delete(md, "x-content-type") delete(md, "timeout")