From 1ad3f77338d182904beae5bb56e67f03428c28d5 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 10 Sep 2020 11:46:00 -0700 Subject: [PATCH 1/8] parquet writer support for TIMESTAMP_MILLIS Signed-off-by: Raza Jafri --- .../src/main/python/parquet_test.py | 9 ++++++++ .../spark/rapids/ColumnarOutputWriter.scala | 23 +++++++++++++++---- .../spark/rapids/GpuParquetFileFormat.scala | 12 +++++++--- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 844040d4ffd..2302d71c9a4 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -125,6 +125,15 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enab conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) +def test_parquet_write_ts_millis(spark_tmp_path): + gen = TimestampGen() + data_path = spark_tmp_path + '/PARQUET_DATA' + with_gpu_session( + lambda spark : unary_op_df(spark, gen).write.parquet(data_path), + conf={'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MILLIS'}) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path)) + def readParquetCatchException(spark, data_path): with pytest.raises(Exception) as e_info: df = spark.read.parquet(data_path).collect() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index e3b73932fb3..574f62ec3d1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -18,14 +18,14 @@ package com.nvidia.spark.rapids import scala.collection.mutable -import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter} +import ai.rapids.cudf.{DType, HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.hadoop.fs.{FSDataOutputStream, Path} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TaskContext import org.apache.spark.sql.rapids.{ColumnarWriteTaskStatsTracker, GpuWriteTaskStatsTracker} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -105,12 +105,27 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, */ def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { var needToCloseBatch = true + val castToMillis = conf.get(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS, + "false") + val newBatch: ColumnarBatch = if (castToMillis.equals("true")) { + new ColumnarBatch(GpuColumnVector.extractColumns(batch).map(cv => { + if (cv.dataType() == DataTypes.TimestampType) { + new GpuColumnVector(DataTypes.TimestampType, withResource(cv.getBase()) { v => + v.castTo(DType.TIMESTAMP_MILLISECONDS) + }) + } else { + cv + } + })) + } else { + batch + } try { val writeStartTimestamp = System.nanoTime val writeRange = new NvtxRange("File write", NvtxColor.YELLOW) val gpuTime = try { needToCloseBatch = false - writeBatch(batch) + writeBatch(newBatch) } finally { writeRange.close() } @@ -125,7 +140,7 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, } } finally { if (needToCloseBatch) { - batch.close() + newBatch.close() } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 6a18ec8c26a..6a60f458abb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types.{DateType, StructType, TimestampType} object GpuParquetFileFormat { + val PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS = "com.nvidia.spark.rapids.parquet.write.castToMillis" + def tagGpuSupport( meta: RapidsMeta[_, _, _], spark: SparkSession, @@ -64,9 +66,9 @@ object GpuParquetFileFormat { TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType]) } if (schemaHasTimestamps) { - // TODO: Could support TIMESTAMP_MILLIS by performing cast on all timestamp input columns sqlConf.parquetOutputTimestampType match { case ParquetOutputTimestampType.TIMESTAMP_MICROS => + case ParquetOutputTimestampType.TIMESTAMP_MILLIS => case t => meta.willNotWorkOnGpu(s"Output timestamp type $t is not supported") } } @@ -162,11 +164,15 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { val outputTimestampType = sparkSession.sessionState.conf.parquetOutputTimestampType if (outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MICROS) { val hasTimestamps = dataSchema.exists { field => - TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType]) + TrampolineUtil.dataTypeExistsRecursively(field.dataType, f => { + f.isInstanceOf[TimestampType] + }) } - if (hasTimestamps) { + if (hasTimestamps && outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MILLIS) { throw new UnsupportedOperationException( s"Unsupported output timestamp type: $outputTimestampType") + } else if (hasTimestamps) { + conf.set(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS, "true") } } conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, outputTimestampType.toString) From f1f2efbdaa06491a7b2242139eaa3d13faa9c4c4 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 10 Sep 2020 20:48:22 -0700 Subject: [PATCH 2/8] addressed review comments Signed-off-by: Raza Jafri --- .../src/main/python/parquet_test.py | 10 +++++--- .../spark/rapids/ColumnarOutputWriter.scala | 25 +++++++++++-------- .../spark/rapids/GpuParquetFileFormat.scala | 4 --- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 2302d71c9a4..dd28cb041b5 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -125,14 +125,18 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enab conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) -def test_parquet_write_ts_millis(spark_tmp_path): +@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS']) +@pytest.mark.parametrize('ts_rebase', ['CORRECTED']) +def test_parquet_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): gen = TimestampGen() data_path = spark_tmp_path + '/PARQUET_DATA' with_gpu_session( lambda spark : unary_op_df(spark, gen).write.parquet(data_path), - conf={'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MILLIS'}) + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, + 'spark.sql.parquet.outputTimestampType': ts_type}) assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.read.parquet(data_path)) + lambda spark : spark.read.parquet(data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase}) def readParquetCatchException(spark, data_path): with pytest.raises(Exception) as e_info: diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index 574f62ec3d1..fa49fa7ff3d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.{FSDataOutputStream, Path} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TaskContext +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.rapids.{ColumnarWriteTaskStatsTracker, GpuWriteTaskStatsTracker} import org.apache.spark.sql.types.{DataTypes, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -105,18 +107,19 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, */ def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { var needToCloseBatch = true - val castToMillis = conf.get(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS, - "false") - val newBatch: ColumnarBatch = if (castToMillis.equals("true")) { - new ColumnarBatch(GpuColumnVector.extractColumns(batch).map(cv => { - if (cv.dataType() == DataTypes.TimestampType) { - new GpuColumnVector(DataTypes.TimestampType, withResource(cv.getBase()) { v => - v.castTo(DType.TIMESTAMP_MILLISECONDS) - }) - } else { - cv + val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) + val newBatch = if (outputTimestampType == ParquetOutputTimestampType.TIMESTAMP_MILLIS) { + new ColumnarBatch(GpuColumnVector.extractColumns(batch).map { + cv => { + cv.dataType() match { + case DataTypes.TimestampType => new GpuColumnVector(DataTypes.TimestampType, + withResource(cv.getBase()) { v => + v.castTo(DType.TIMESTAMP_MILLISECONDS) + }) + case _ => cv + } } - })) + }) } else { batch } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 6a60f458abb..76f821e11a6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -34,8 +34,6 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types.{DateType, StructType, TimestampType} object GpuParquetFileFormat { - val PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS = "com.nvidia.spark.rapids.parquet.write.castToMillis" - def tagGpuSupport( meta: RapidsMeta[_, _, _], spark: SparkSession, @@ -171,8 +169,6 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { if (hasTimestamps && outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MILLIS) { throw new UnsupportedOperationException( s"Unsupported output timestamp type: $outputTimestampType") - } else if (hasTimestamps) { - conf.set(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS, "true") } } conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, outputTimestampType.toString) From 8e76e6f018c11e3b5488ad291c22dac203b9cb51 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 11 Sep 2020 11:58:19 -0700 Subject: [PATCH 3/8] addressed review comments Signed-off-by: Raza Jafri --- .../src/main/python/parquet_test.py | 26 ++++++++-------- .../spark/rapids/ColumnarOutputWriter.scala | 31 ++++++++++--------- .../spark/rapids/GpuParquetFileFormat.scala | 20 ++++++++---- 3 files changed, 43 insertions(+), 34 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index dd28cb041b5..0a074071ade 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -125,19 +125,6 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enab conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) -@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS']) -@pytest.mark.parametrize('ts_rebase', ['CORRECTED']) -def test_parquet_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): - gen = TimestampGen() - data_path = spark_tmp_path + '/PARQUET_DATA' - with_gpu_session( - lambda spark : unary_op_df(spark, gen).write.parquet(data_path), - conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, - 'spark.sql.parquet.outputTimestampType': ts_type}) - assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.read.parquet(data_path), - conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase}) - def readParquetCatchException(spark, data_path): with pytest.raises(Exception) as e_info: df = spark.read.parquet(data_path).collect() @@ -316,6 +303,19 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list) 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) +@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS']) +@pytest.mark.parametrize('ts_rebase', ['CORRECTED']) +@ignore_order +def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): + gen = TimestampGen() + data_path = spark_tmp_path + '/PARQUET_DATA' + assert_gpu_and_cpu_writes_are_equal_collect( + lambda spark, path: unary_op_df(spark, gen).write.parquet(path), + lambda spark, path: spark.read.parquet(path), + data_path, + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, + 'spark.sql.parquet.outputTimestampType': ts_type}) + parquet_part_write_gens = [ byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, # Some file systems have issues with UTF8 strings so to help the test pass even there diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index fa49fa7ff3d..9465b5928a1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -66,6 +66,7 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, val tableWriter: TableWriter val conf = context.getConfiguration + val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) private[this] val outputStream: FSDataOutputStream = { val hadoopPath = new Path(path) @@ -107,22 +108,22 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, */ def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { var needToCloseBatch = true - val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) - val newBatch = if (outputTimestampType == ParquetOutputTimestampType.TIMESTAMP_MILLIS) { - new ColumnarBatch(GpuColumnVector.extractColumns(batch).map { - cv => { - cv.dataType() match { - case DataTypes.TimestampType => new GpuColumnVector(DataTypes.TimestampType, - withResource(cv.getBase()) { v => - v.castTo(DType.TIMESTAMP_MILLISECONDS) - }) - case _ => cv + val newBatch = + if (outputTimestampType == ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { + new ColumnarBatch(GpuColumnVector.extractColumns(batch).map { + cv => { + cv.dataType() match { + case DataTypes.TimestampType => new GpuColumnVector(DataTypes.TimestampType, + withResource(cv.getBase()) { v => + v.castTo(DType.TIMESTAMP_MILLISECONDS) + }) + case _ => cv + } } - } - }) - } else { - batch - } + }) + } else { + batch + } try { val writeStartTimestamp = System.nanoTime val writeRange = new NvtxRange("File write", NvtxColor.YELLOW) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 76f821e11a6..28576ad1ee2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -64,10 +64,9 @@ object GpuParquetFileFormat { TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType]) } if (schemaHasTimestamps) { - sqlConf.parquetOutputTimestampType match { - case ParquetOutputTimestampType.TIMESTAMP_MICROS => - case ParquetOutputTimestampType.TIMESTAMP_MILLIS => - case t => meta.willNotWorkOnGpu(s"Output timestamp type $t is not supported") + if(!isOutputTimestampTypeSupported(sqlConf.parquetOutputTimestampType)) { + meta.willNotWorkOnGpu(s"Output timestamp type " + + s"${sqlConf.parquetOutputTimestampType} is not supported") } } @@ -100,6 +99,15 @@ object GpuParquetFileFormat { case _ => None } } + + def isOutputTimestampTypeSupported( + outputTimestampType: ParquetOutputTimestampType.Value): Boolean = { + outputTimestampType match { + case ParquetOutputTimestampType.TIMESTAMP_MICROS | + ParquetOutputTimestampType.TIMESTAMP_MILLIS => true + case _ => false + } + } } class GpuParquetFileFormat extends ColumnarFileFormat with Logging { @@ -160,13 +168,13 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { sparkSession.sessionState.conf.writeLegacyParquetFormat.toString) val outputTimestampType = sparkSession.sessionState.conf.parquetOutputTimestampType - if (outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MICROS) { + if(!GpuParquetFileFormat.isOutputTimestampTypeSupported(outputTimestampType)) { val hasTimestamps = dataSchema.exists { field => TrampolineUtil.dataTypeExistsRecursively(field.dataType, f => { f.isInstanceOf[TimestampType] }) } - if (hasTimestamps && outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MILLIS) { + if (hasTimestamps) { throw new UnsupportedOperationException( s"Unsupported output timestamp type: $outputTimestampType") } From fbb0a43068cbc1d8130223b4e794b4a570eaa082 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 11 Sep 2020 13:18:05 -0700 Subject: [PATCH 4/8] code cleanup Signed-off-by: Raza Jafri --- .../scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 28576ad1ee2..2553c7002b3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -170,9 +170,7 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { val outputTimestampType = sparkSession.sessionState.conf.parquetOutputTimestampType if(!GpuParquetFileFormat.isOutputTimestampTypeSupported(outputTimestampType)) { val hasTimestamps = dataSchema.exists { field => - TrampolineUtil.dataTypeExistsRecursively(field.dataType, f => { - f.isInstanceOf[TimestampType] - }) + TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType]) } if (hasTimestamps) { throw new UnsupportedOperationException( From e52834777486071601460e28ef2a7439faaa9dd0 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 14 Sep 2020 15:59:50 -0700 Subject: [PATCH 5/8] rebased --- .../src/main/python/parquet_test.py | 12 ++++--- .../scala/com/nvidia/spark/RebaseHelper.scala | 3 +- .../spark/rapids/ColumnarOutputWriter.scala | 22 ++----------- .../spark/rapids/GpuParquetFileFormat.scala | 33 +++++++++++++++++-- .../spark/rapids/ParquetWriterSuite.scala | 16 ++++++++- 5 files changed, 57 insertions(+), 29 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 0a074071ade..3137a2dfabd 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -291,7 +291,8 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, mt_opt): @pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn) @pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_write_round_trip(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list): +@pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"]) +def test_write_round_trip(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list, ts_type): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( @@ -299,11 +300,11 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list) lambda spark, path: spark.read.parquet(path), data_path, conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', - 'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS', + 'spark.sql.parquet.outputTimestampType': ts_type, 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) -@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS']) +@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS']) @pytest.mark.parametrize('ts_rebase', ['CORRECTED']) @ignore_order def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): @@ -327,7 +328,8 @@ def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): @pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn) @pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_part_write_round_trip(spark_tmp_path, parquet_gen, mt_opt, v1_enabled_list): +@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS']) +def test_part_write_round_trip(spark_tmp_path, parquet_gen, mt_opt, v1_enabled_list, ts_type): gen_list = [('a', RepeatSeqGen(parquet_gen, 10)), ('b', parquet_gen)] data_path = spark_tmp_path + '/PARQUET_DATA' @@ -336,7 +338,7 @@ def test_part_write_round_trip(spark_tmp_path, parquet_gen, mt_opt, v1_enabled_l lambda spark, path: spark.read.parquet(path), data_path, conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', - 'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS', + 'spark.sql.parquet.outputTimestampType': ts_type, 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala index 95a869c4e8c..8794ddf76d3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala @@ -18,7 +18,6 @@ package com.nvidia.spark import ai.rapids.cudf.{ColumnVector, DType, Scalar} import com.nvidia.spark.rapids.Arm - import org.apache.spark.sql.catalyst.util.RebaseDateTime import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.execution.TrampolineUtil @@ -39,7 +38,7 @@ object RebaseHelper extends Arm { } else if (dtype.isTimestamp) { assert(dtype == DType.TIMESTAMP_MICROSECONDS) withResource( - Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, startTs)) { minGood => + Scalar.timestampFromLong(column.getDataType, startTs)) { minGood => withResource(column.lessThan(minGood)) { hasBad => withResource(hasBad.any()) { a => a.getBoolean diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index 9465b5928a1..5c9e4255669 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -66,7 +66,6 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, val tableWriter: TableWriter val conf = context.getConfiguration - val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) private[this] val outputStream: FSDataOutputStream = { val hadoopPath = new Path(path) @@ -108,28 +107,13 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, */ def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { var needToCloseBatch = true - val newBatch = - if (outputTimestampType == ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { - new ColumnarBatch(GpuColumnVector.extractColumns(batch).map { - cv => { - cv.dataType() match { - case DataTypes.TimestampType => new GpuColumnVector(DataTypes.TimestampType, - withResource(cv.getBase()) { v => - v.castTo(DType.TIMESTAMP_MILLISECONDS) - }) - case _ => cv - } - } - }) - } else { - batch - } + try { val writeStartTimestamp = System.nanoTime val writeRange = new NvtxRange("File write", NvtxColor.YELLOW) val gpuTime = try { needToCloseBatch = false - writeBatch(newBatch) + writeBatch(batch) } finally { writeRange.close() } @@ -144,7 +128,7 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, } } finally { if (needToCloseBatch) { - newBatch.close() + batch.close() } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 2553c7002b3..71d0a874b74 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -23,15 +23,16 @@ import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil - import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetWriteSupport} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType +import org.apache.spark.sql.rapids.ColumnarWriteTaskStatsTracker import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.types.{DateType, StructType, TimestampType} +import org.apache.spark.sql.types.{DataTypes, DateType, StructType, TimestampType} +import org.apache.spark.sql.vectorized.ColumnarBatch object GpuParquetFileFormat { def tagGpuSupport( @@ -225,6 +226,8 @@ class GpuParquetWriter( context: TaskAttemptContext) extends ColumnarOutputWriter(path, context, dataSchema, "Parquet") { + val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) + override def scanTableBeforeWrite(table: Table): Unit = { if (dateTimeRebaseException) { (0 until table.getNumberOfColumns).foreach { i => @@ -235,6 +238,32 @@ class GpuParquetWriter( } } + /** + * Persists a columnar batch. Invoked on the executor side. When writing to dynamically + * partitioned tables, dynamic partition columns are not included in columns to be written. + * NOTE: It is the writer's responsibility to close the batch. + */ + override def write(batch: ColumnarBatch, + statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { + val newBatch = + if (outputTimestampType == ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { + new ColumnarBatch(GpuColumnVector.extractColumns(batch).map { + cv => { + cv.dataType() match { + case DataTypes.TimestampType => new GpuColumnVector(DataTypes.TimestampType, + withResource(cv.getBase()) { v => + v.castTo(DType.TIMESTAMP_MILLISECONDS) + }) + case _ => cv + } + } + }) + } else { + batch + } + + super.write(newBatch, statsTrackers) + } override val tableWriter: TableWriter = { val writeContext = new ParquetWriteSupport().init(conf) val builder = ParquetWriterOptions.builder() diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala index 55a2cb3596c..929157fe082 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetFileReader - import org.apache.spark.{SparkConf, SparkException} /** @@ -103,6 +102,21 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite { } } + testExpectedGpuException( + "Old timestamps millis in EXCEPTION mode", + classOf[SparkException], + oldTsDf, + new SparkConf() + .set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "EXCEPTION") + .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")) { + val tempFile = File.createTempFile("oldTimeStamp", "parquet") + tempFile.delete() + frame => { + frame.write.mode("overwrite").parquet(tempFile.getAbsolutePath) + frame + } + } + testExpectedGpuException( "Old timestamps in EXCEPTION mode", classOf[SparkException], From 592cc050fee07d8490843d378676a56ef1cd3f1b Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 14 Sep 2020 17:31:47 -0700 Subject: [PATCH 6/8] cleanup --- sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala index 8794ddf76d3..91e789df0fc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala @@ -38,7 +38,7 @@ object RebaseHelper extends Arm { } else if (dtype.isTimestamp) { assert(dtype == DType.TIMESTAMP_MICROSECONDS) withResource( - Scalar.timestampFromLong(column.getDataType, startTs)) { minGood => + Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, startTs)) { minGood => withResource(column.lessThan(minGood)) { hasBad => withResource(hasBad.any()) { a => a.getBoolean From c1d6d5f8ad7a55392811c8e91342188b6bc45995 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 14 Sep 2020 18:13:55 -0700 Subject: [PATCH 7/8] scalatest changes --- sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala | 1 + .../scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala | 1 + .../test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala | 1 + 3 files changed, 3 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala index 91e789df0fc..95a869c4e8c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/RebaseHelper.scala @@ -18,6 +18,7 @@ package com.nvidia.spark import ai.rapids.cudf.{ColumnVector, DType, Scalar} import com.nvidia.spark.rapids.Arm + import org.apache.spark.sql.catalyst.util.RebaseDateTime import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.execution.TrampolineUtil diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 71d0a874b74..8b7482ed72d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -23,6 +23,7 @@ import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil + import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.DataSourceUtils diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala index 929157fe082..2efdab41ba9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetFileReader + import org.apache.spark.{SparkConf, SparkException} /** From 7b32901839f21abadc92f6865a068d8144789dde Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 15 Sep 2020 10:18:49 -0700 Subject: [PATCH 8/8] reverting unnecessary changes Signed-off-by: Raza Jafri --- .../com/nvidia/spark/rapids/ColumnarOutputWriter.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index 5c9e4255669..e3b73932fb3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -18,16 +18,14 @@ package com.nvidia.spark.rapids import scala.collection.mutable -import ai.rapids.cudf.{DType, HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter} +import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.hadoop.fs.{FSDataOutputStream, Path} import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.TaskContext -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.rapids.{ColumnarWriteTaskStatsTracker, GpuWriteTaskStatsTracker} -import org.apache.spark.sql.types.{DataTypes, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -107,7 +105,6 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, */ def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { var needToCloseBatch = true - try { val writeStartTimestamp = System.nanoTime val writeRange = new NvtxRange("File write", NvtxColor.YELLOW)