-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parquet writer support for TIMESTAMP_MILLIS #726
Conversation
"false") | ||
val newBatch: ColumnarBatch = if (castToMillis.equals("true")) { | ||
new ColumnarBatch(GpuColumnVector.extractColumns(batch).map(cv => { | ||
if (cv.dataType() == DataTypes.TimestampType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. map {....
@@ -105,12 +105,27 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, | |||
*/ | |||
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { | |||
var needToCloseBatch = true | |||
val castToMillis = conf.get(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS, | |||
"false") | |||
val newBatch: ColumnarBatch = if (castToMillis.equals("true")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do a boolean check like we do for other rapids confs insterad of string equal check.
@@ -34,6 +34,8 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil | |||
import org.apache.spark.sql.types.{DateType, StructType, TimestampType} | |||
|
|||
object GpuParquetFileFormat { | |||
val PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS = "com.nvidia.spark.rapids.parquet.write.castToMillis" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in Rapids conf where all other confs are?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a new config here. We should just do what Spark is doing here and use the same SQLConf config. That way if we add INT96 support it's straightforward. Making this a boolean means we'll have to update it if/when INT96 or other types are supported. Let's simplify and use the Spark conf key which precludes the need for a new conf key.
data_path = spark_tmp_path + '/PARQUET_DATA' | ||
with_gpu_session( | ||
lambda spark : unary_op_df(spark, gen).write.parquet(data_path), | ||
conf={'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MILLIS'}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit could we parameterize this so in the future when we hopefully support INT96 it is a very small change to test that too?
@@ -125,6 +125,15 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, small_file_opt, | |||
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, | |||
'spark.sql.sources.useV1SourceList': v1_enabled_list}) | |||
|
|||
def test_parquet_write_ts_millis(spark_tmp_path): | |||
gen = TimestampGen() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we doing to avoid ts_rebaseModeIn*
We cannot support the full range of write/read options for time stamps unless something has changed recently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might not be hitting the ambiguous dates while generating the Timegen in this tests as the tests are passing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upon digging into this further, I am a little confused as to why the tests were passing without passing the ts_rebaseModeIn*. As per the documentation, the default mode is EXCEPTION
which should throw an exception if it encounters an ambiguous date, but when I don't set a ts_rebaseModeIn* it behaves as if I had set the value as CORRECTED
. Upon explicitly setting the value to EXCEPTION
the test threw an exception. I have made the change to explicitly set the ts_rebaseModeIn* as CORRECTED
. Please let me know if this is acceptable or if you had something else in mind.
@@ -162,11 +164,15 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { | |||
val outputTimestampType = sparkSession.sessionState.conf.parquetOutputTimestampType | |||
if (outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MICROS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really convoluted code now. Could we just use a match for the output type? and why do we need to set a second config to reflect the value of the first config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 we should just port the SQLConf key/value pair into the Hadoop Configuration as Spark does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not doing what I asked.
sparkSession.sessionState.conf.parquetOutputTimestampType match {
case ParquetOutputTimestampType.TIMESTAMP_MICROS =>
case ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
case outputTimestampType =>
val hasTimestamps = dataSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType])
}
if (hasTimestamps) {
throw new UnsupportedOperationException(
s"Unsupported output timestamp type: $outputTimestampType")
}
}
Because this code is doing almost exactly the same thing as above we might be able to combine them together.
@@ -105,12 +105,27 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, | |||
*/ | |||
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { | |||
var needToCloseBatch = true | |||
val castToMillis = conf.get(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not lookup the config each batch but only once when the writer is created. It's wasteful to redundantly re-parse the config on every batch.
@@ -34,6 +34,8 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil | |||
import org.apache.spark.sql.types.{DateType, StructType, TimestampType} | |||
|
|||
object GpuParquetFileFormat { | |||
val PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS = "com.nvidia.spark.rapids.parquet.write.castToMillis" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a new config here. We should just do what Spark is doing here and use the same SQLConf config. That way if we add INT96 support it's straightforward. Making this a boolean means we'll have to update it if/when INT96 or other types are supported. Let's simplify and use the Spark conf key which precludes the need for a new conf key.
@@ -162,11 +164,15 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { | |||
val outputTimestampType = sparkSession.sessionState.conf.parquetOutputTimestampType | |||
if (outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MICROS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 we should just port the SQLConf key/value pair into the Hadoop Configuration as Spark does.
@@ -105,12 +107,28 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, | |||
*/ | |||
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { | |||
var needToCloseBatch = true | |||
val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still looking up the config on every batch, this should be done once when the ColumnarOutputWriter
instance is created.
@@ -105,12 +107,28 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext, | |||
*/ | |||
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = { | |||
var needToCloseBatch = true | |||
val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key) | |||
val newBatch = if (outputTimestampType == ParquetOutputTimestampType.TIMESTAMP_MILLIS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work? It looks like we're comparing a string with an enum here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing this out!
Thanks for the review @jlowe @revans2 @kuhushukla can you please take another look. @revans2 I thought about merging this test with the another write_test but I stuck with this because it keeps it simple |
build |
1 similar comment
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala
Outdated
Show resolved
Hide resolved
I also just noticed that all of the changes are in generic code so setting a parquet config could impact orc output |
@razajafri things are looking off after the upmerge. Not sure if github is confused or what but it shows 50 files have change. |
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
9747cc7
to
592cc05
Compare
build |
@revans2 I apologize but I had to rebase. The I might be missing something very basic that you are trying to point out |
build |
@@ -18,14 +18,16 @@ package com.nvidia.spark.rapids | |||
|
|||
import scala.collection.mutable | |||
|
|||
import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter} | |||
import ai.rapids.cudf.{DType, HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we revert all of the changes to this file? It looks like none of them are needed.
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
build |
Odd, the CI job says it passed, but github does not show it as passed... |
can we force merge? or do we build again? |
Bypassed the github checks because the CI build passed, but it looked like it didn't update github, despite a log message in the job saying it did. |
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
…IDIA#726) Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com> Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Signed-off-by: Raza Jafri rjafri@nvidia.com
This adds support to write TIMESTAMP_MILLIS to parquet writer.
@jlowe PTAL as you are the original author
fixes #142