Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet writer support for TIMESTAMP_MILLIS #726

Merged
merged 8 commits into from
Sep 15, 2020

Conversation

razajafri
Copy link
Collaborator

Signed-off-by: Raza Jafri rjafri@nvidia.com

This adds support to write TIMESTAMP_MILLIS to parquet writer.

@jlowe PTAL as you are the original author

fixes #142

"false")
val newBatch: ColumnarBatch = if (castToMillis.equals("true")) {
new ColumnarBatch(GpuColumnVector.extractColumns(batch).map(cv => {
if (cv.dataType() == DataTypes.TimestampType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. map {....

@@ -105,12 +105,27 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
*/
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = {
var needToCloseBatch = true
val castToMillis = conf.get(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS,
"false")
val newBatch: ColumnarBatch = if (castToMillis.equals("true")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do a boolean check like we do for other rapids confs insterad of string equal check.

@@ -34,6 +34,8 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DateType, StructType, TimestampType}

object GpuParquetFileFormat {
val PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS = "com.nvidia.spark.rapids.parquet.write.castToMillis"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in Rapids conf where all other confs are?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a new config here. We should just do what Spark is doing here and use the same SQLConf config. That way if we add INT96 support it's straightforward. Making this a boolean means we'll have to update it if/when INT96 or other types are supported. Let's simplify and use the Spark conf key which precludes the need for a new conf key.

data_path = spark_tmp_path + '/PARQUET_DATA'
with_gpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
conf={'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MILLIS'})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit could we parameterize this so in the future when we hopefully support INT96 it is a very small change to test that too?

@@ -125,6 +125,15 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, small_file_opt,
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

def test_parquet_write_ts_millis(spark_tmp_path):
gen = TimestampGen()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we doing to avoid ts_rebaseModeIn* We cannot support the full range of write/read options for time stamps unless something has changed recently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not be hitting the ambiguous dates while generating the Timegen in this tests as the tests are passing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon digging into this further, I am a little confused as to why the tests were passing without passing the ts_rebaseModeIn*. As per the documentation, the default mode is EXCEPTION which should throw an exception if it encounters an ambiguous date, but when I don't set a ts_rebaseModeIn* it behaves as if I had set the value as CORRECTED. Upon explicitly setting the value to EXCEPTION the test threw an exception. I have made the change to explicitly set the ts_rebaseModeIn* as CORRECTED. Please let me know if this is acceptable or if you had something else in mind.

@@ -162,11 +164,15 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
val outputTimestampType = sparkSession.sessionState.conf.parquetOutputTimestampType
if (outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MICROS) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really convoluted code now. Could we just use a match for the output type? and why do we need to set a second config to reflect the value of the first config?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 we should just port the SQLConf key/value pair into the Hadoop Configuration as Spark does.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not doing what I asked.

sparkSession.sessionState.conf.parquetOutputTimestampType match {
    case ParquetOutputTimestampType.TIMESTAMP_MICROS =>
    case ParquetOutputTimestampType.TIMESTAMP_MILLIS =>
    case outputTimestampType =>
         val hasTimestamps = dataSchema.exists { field =>
            TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType])
         }
         if (hasTimestamps) {
             throw new UnsupportedOperationException(
                  s"Unsupported output timestamp type: $outputTimestampType")
         }
}

Because this code is doing almost exactly the same thing as above we might be able to combine them together.

@@ -105,12 +105,27 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
*/
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = {
var needToCloseBatch = true
val castToMillis = conf.get(GpuParquetFileFormat.PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not lookup the config each batch but only once when the writer is created. It's wasteful to redundantly re-parse the config on every batch.

@@ -34,6 +34,8 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DateType, StructType, TimestampType}

object GpuParquetFileFormat {
val PARQUET_WRITE_TIMESTAMP_CAST_TO_MILLIS = "com.nvidia.spark.rapids.parquet.write.castToMillis"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a new config here. We should just do what Spark is doing here and use the same SQLConf config. That way if we add INT96 support it's straightforward. Making this a boolean means we'll have to update it if/when INT96 or other types are supported. Let's simplify and use the Spark conf key which precludes the need for a new conf key.

@@ -162,11 +164,15 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
val outputTimestampType = sparkSession.sessionState.conf.parquetOutputTimestampType
if (outputTimestampType != ParquetOutputTimestampType.TIMESTAMP_MICROS) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 we should just port the SQLConf key/value pair into the Hadoop Configuration as Spark does.

@sameerz sameerz added the feature request New feature or request label Sep 11, 2020
@@ -105,12 +107,28 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
*/
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = {
var needToCloseBatch = true
val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still looking up the config on every batch, this should be done once when the ColumnarOutputWriter instance is created.

@@ -105,12 +107,28 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
*/
def write(batch: ColumnarBatch, statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Unit = {
var needToCloseBatch = true
val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)
val newBatch = if (outputTimestampType == ParquetOutputTimestampType.TIMESTAMP_MILLIS) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work? It looks like we're comparing a string with an enum here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out!

@razajafri
Copy link
Collaborator Author

Thanks for the review @jlowe @revans2 @kuhushukla can you please take another look. @revans2 I thought about merging this test with the another write_test but I stuck with this because it keeps it simple

@razajafri
Copy link
Collaborator Author

build

1 similar comment
@razajafri
Copy link
Collaborator Author

build

@revans2
Copy link
Collaborator

revans2 commented Sep 11, 2020

I also just noticed that all of the changes are in generic code so setting a parquet config could impact orc output

@revans2
Copy link
Collaborator

revans2 commented Sep 14, 2020

@razajafri things are looking off after the upmerge. Not sure if github is confused or what but it shows 50 files have change.

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
@razajafri
Copy link
Collaborator Author

build

@razajafri
Copy link
Collaborator Author

@revans2 I apologize but I had to rebase. The scanTableBeforeWrite is called before write so all the columns should be in MICROS which precludes the need to convert the columns before checking for when the switch happened.

I might be missing something very basic that you are trying to point out

@razajafri
Copy link
Collaborator Author

build

revans2
revans2 previously approved these changes Sep 15, 2020
@@ -18,14 +18,16 @@ package com.nvidia.spark.rapids

import scala.collection.mutable

import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter}
import ai.rapids.cudf.{DType, HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert all of the changes to this file? It looks like none of them are needed.

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
@razajafri
Copy link
Collaborator Author

build

@revans2
Copy link
Collaborator

revans2 commented Sep 15, 2020

Odd, the CI job says it passed, but github does not show it as passed...

@razajafri
Copy link
Collaborator Author

can we force merge? or do we build again?

@revans2 revans2 merged commit 3219fa4 into NVIDIA:branch-0.3 Sep 15, 2020
@revans2
Copy link
Collaborator

revans2 commented Sep 15, 2020

Bypassed the github checks because the CI build passed, but it looked like it didn't update github, despite a log message in the job saying it did.

nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
Signed-off-by: Raza Jafri <rjafri@nvidia.com>
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this pull request Nov 30, 2023
…IDIA#726)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Support TIMESTAMP_MILLIS for spark.sql.parquet.outputTimestampType
5 participants