Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in basic support for reading lists from parquet #1127

Merged
merged 2 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ def read_parquet_sql(data_path):

parquet_gens_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen,
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))],
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), ArrayGen(byte_gen),
ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen),
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(ArrayGen(byte_gen))],
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/132'))]

# test with original parquet file reader, the multi-file parallel reader for cloud, and coalesce file reader for
Expand All @@ -51,7 +54,9 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs,
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, 'spark.sql.legacy.parquet.datetimeRebaseModeInRead': 'CORRECTED'})
# once https://github.com/NVIDIA/spark-rapids/issues/1126 is in we can remove spark.sql.legacy.parquet.datetimeRebaseModeInRead config which is a workaround
# for nested timestamp/date support
assert_gpu_and_cpu_are_equal_collect(read_func(data_path),
conf=all_confs)

Expand Down Expand Up @@ -119,14 +124,36 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']


# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed delete this test and merge it
# into test_ts_read_round_trip nested timestamps and dates are not supported right now.
@pytest.mark.parametrize('gen', [ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1126')
def test_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_write})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=all_confs)

# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
gen = TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))
def test_ts_read_round_trip(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
Expand All @@ -143,14 +170,15 @@ def readParquetCatchException(spark, data_path):
df = spark.read.parquet(data_path).collect()
assert e_info.match(r".*SparkUpgradeException.*")

# Once https://github.com/NVIDIA/spark-rapids/issues/1126 is fixed nested timestamps and dates should be added in
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
@pytest.mark.parametrize('gen', [TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))], ids=idfn)
@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['LEGACY'])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids.shims.spark300
import java.time.ZoneId

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids.spark300.RapidsShuffleManager

import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -136,7 +135,10 @@ class Spark300Shims extends SparkShims {
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)
GpuOverrides.isSupportedType(t,
allowArray = true,
allowStringMaps = true,
allowNesting = true)

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package com.nvidia.spark.rapids.shims.spark300db

import java.time.ZoneId

import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark300.Spark300Shims

import org.apache.spark.sql.rapids.shims.spark300db._
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkEnv

tgravescs marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -91,7 +92,10 @@ class Spark300dbShims extends Spark300Shims {
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)
GpuOverrides.isSupportedType(t,
allowArray = true,
allowStringMaps = true,
allowNesting = true)

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package com.nvidia.spark.rapids.shims.spark301db

import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import org.apache.spark.sql.rapids.shims.spark301db._

import org.apache.spark.sql.rapids.shims.spark301db._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -84,7 +83,10 @@ class Spark301dbShims extends Spark301Shims {
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)
GpuOverrides.isSupportedType(t,
allowArray = true,
allowStringMaps = true,
allowNesting = true)

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.nvidia.spark.rapids.shims.spark310

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import com.nvidia.spark.rapids.spark310.RapidsShuffleManager

Expand Down Expand Up @@ -137,12 +136,15 @@ class Spark310Shims extends Spark301Shims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t,
allowArray = true,
allowStringMaps = true,
allowNesting = true)

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
Expand Down Expand Up @@ -214,9 +216,6 @@ class Spark310Shims extends Spark301Shims {
a.dataFilters,
conf)
}

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object RebaseHelper extends Arm {
private[this] def isDateTimeRebaseNeeded(column: ColumnVector,
startDay: Int,
startTs: Long): Boolean = {
// TODO update this for nested column checks
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val dtype = column.getType
if (dtype == DType.TIMESTAMP_DAYS) {
withResource(Scalar.timestampDaysFromInt(startDay)) { minGood =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,12 @@ object GpuOverrides {
override val childScans: scala.Seq[ScanMeta[_]] =
Seq(GpuOverrides.wrapScan(p.scan, conf, Some(this)))

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t,
allowStringMaps = true,
allowArray = true,
allowNesting = true)

override def convertToGpu(): GpuExec =
GpuBatchScanExec(p.output, childScans(0).convertToGpu())
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.InputFileUtils
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{MapType, StringType, StructType, TimestampType}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, MapType, StringType, StructType, TimestampType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -134,8 +134,11 @@ object GpuParquetScanBase {
}

for (field <- readSchema) {
if (!GpuColumnVector.isSupportedType(field.dataType)
&& !field.dataType.isInstanceOf[MapType]) {
if (!GpuOverrides.isSupportedType(
field.dataType,
allowStringMaps = true,
allowArray = true,
allowNesting = true)) {
meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}")
}
}
Expand All @@ -153,6 +156,17 @@ object GpuParquetScanBase {
val schemaHasTimestamps = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType])
}
def isTsOrDate(dt: DataType) : Boolean = dt match {
case TimestampType | DateType => true
case _ => false
}
val schemaMightNeedNestedRebase = readSchema.exists { field =>
field.dataType match {
case MapType(_, _, _) | ArrayType(_, _) | StructType(_) =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, isTsOrDate)
case _ => false
}
}

// Currently timestamp conversion is not supported.
// If support needs to be added then we need to follow the logic in Spark's
Expand All @@ -168,9 +182,16 @@ object GpuParquetScanBase {
}

sqlConf.get(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key) match {
case "EXCEPTION" => // Good
case "EXCEPTION" => if (schemaMightNeedNestedRebase) {
meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " +
s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} is EXCEPTION")
}
case "CORRECTED" => // Good
case "LEGACY" => // Good, but it really is EXCEPTION for us...
case "LEGACY" => // really is EXCEPTION for us...
if (schemaMightNeedNestedRebase) {
meta.willNotWorkOnGpu("Nested timestamp and date values are not supported when " +
s"${SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key} is LEGACY")
}
case other =>
meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode")
}
Expand Down