Skip to content

Commit

Permalink
Decimal128 support for Parquet (#4362)
Browse files Browse the repository at this point in the history
* Decimal128 support for Parquet

Signed-off-by: Kuhu Shukla <kuhus@nvidia.com>

Co-authored-by: Kuhu Shukla <kuhus@nvidia.com>
  • Loading branch information
Kuhu Shukla and kuhushukla authored Jan 5, 2022
1 parent b3d37ae commit 048038c
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 46 deletions.
18 changes: 9 additions & 9 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ Accelerator supports are described below.
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>128bit decimal only supported for Orc</em></td>
<td><em>PS<br/>128bit decimal only supported for Orc and Parquet</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -17509,13 +17509,13 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -17530,13 +17530,13 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, UDT</em></td>
<td><b>NS</b></td>
</tr>
</table>
1 change: 1 addition & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
decimal_64_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(7, 3, nullable=False), DecimalGen(12, 2, nullable=False), DecimalGen(18, -3, nullable=False)]]
decimal_128_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False),
DecimalGen(36, -5, nullable=False)]]
decimal_128_no_neg_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False)]]

# Some map gens, but not all because of nesting
map_gens_sample = all_basic_map_gens + [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10),
Expand Down
60 changes: 32 additions & 28 deletions integration_tests/src/main/python/parquet_test.py

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
coalesce_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'}
reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf]
parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit]
parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit,
decimal_gen_20_2, decimal_gen_36_5, decimal_gen_38_0, decimal_gen_38_10]
parquet_decimal_struct_gen= StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_decimal_gens)])
writer_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'}
Expand All @@ -57,7 +58,7 @@ def limited_int96():

parquet_basic_map_gens = [MapGen(f(nullable=False), f()) for f in
[BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen,
limited_timestamp]] + [simple_string_to_string_map_gen]
limited_timestamp]] + [simple_string_to_string_map_gen] + decimal_128_no_neg_map_gens

parquet_struct_gen = [StructGen([['child' + str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_basic_gen)]),
StructGen([['child0', StructGen([['child1', byte_gen]])]]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,9 @@ object GpuOverrides extends Logging {
cudfWrite = TypeSig.none,
sparkSig = TypeSig.atomics)),
(ParquetFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT + TypeSig.ARRAY +
TypeSig.MAP).nested(),
cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT +
cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT +
TypeSig.ARRAY + TypeSig.MAP).nested(),
cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT +
TypeSig.ARRAY + TypeSig.MAP).nested(),
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())),
Expand Down Expand Up @@ -3480,7 +3480,7 @@ object GpuOverrides extends Logging {
exec[DataWritingCommandExec](
"Writing data",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL.withPsNote(
TypeEnum.DECIMAL, "128bit decimal only supported for Orc") +
TypeEnum.DECIMAL, "128bit decimal only supported for Orc and Parquet") +
TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Only supported for Parquet") +
TypeSig.MAP.withPsNote(TypeEnum.MAP, "Only supported for Parquet") +
TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Only supported for Parquet")).nested(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,8 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
field => {
if (field.isPrimitive) {
val t = field.getOriginalType
(t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32)
(t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) ||
(t == OriginalType.UINT_32) || (t == OriginalType.UINT_64)
} else {
existsUnsignedType(field.asGroupType)
}
Expand All @@ -804,7 +805,12 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
}

def needDecimalCast(cv: ColumnView, dt: DataType): Boolean = {
cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType())
// UINT64 is casted to Decimal(20,0) by Spark to accommodate
// the largest possible values this type can take. Other Unsigned data types are converted to
// basic types like LongType, this is analogous to that except we spill over to large
// decimal/ints.
cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) ||
cv.getType.equals(DType.UINT64)
}

def needUnsignedToSignedCast(cv: ColumnView, dt: DataType): Boolean = {
Expand Down
Binary file added tests/src/test/resources/test_unsigned64.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,18 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite {
assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) {
frame => frame.select(col("*"))
}

/**
* Parquet file with 2 columns
* <simple_uint64, UINT64>
* <arr_uint64, array(UINT64)>
*/

testSparkResultsAreEqual("Test Parquet unsigned int: uint64",
frameFromParquet("test_unsigned64.parquet"),
// CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2.
// The exception is like "Parquet type not supported: INT32 (UINT_8)"
assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) {
frame => frame.select(col("*"))
}
}
2 changes: 1 addition & 1 deletion tools/src/main/resources/supportedDataSource.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,
CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA
ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS
ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Parquet,read,S,S,S,S,S,S,S,S,PS,S,PS,NA,NS,NA,PS,PS,PS,NS
Parquet,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS
Parquet,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA

0 comments on commit 048038c

Please sign in to comment.