Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DayTimeIntervalType in ParquetCachedBatchSerializer[databricks] #4926

Merged
merged 5 commits into from
Mar 15, 2022

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented Mar 10, 2022

This PR is to add the new DayTimeIntervalType support in ParquetCachedBatchSerializer for Spark v3.3.0+, along with some tests.

closes #4148
closes #4931

Signed-off-by: Firestarman firestarmanllc@gmail.com

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

ids=["rapids_memoryscan_on", "rapids_memoryscan_off"])
@pytest.mark.parametrize('with_rapids_reader', ['true', 'false'],
ids=["rapids_reader_on", "rapids_reader_off"])
def test_cache_daytimeinterval_input_columnar(spark_tmp_path, alongside_gen,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test method so complicated? What are we doing in this test that we aren't doing in any other test? We have a DF that we cache and then pull it from the cache, compare the result from the CPU and GPU

In other words, why don't we just do the following?

def test_cache_daytimeinterval_input_columnar():
    def func(spark):
        df = two_col_df(spark, DayTimeIntervalGen(), alongside_gen)
        df.cache().count()
        return df.selectExpr("a") // or whatever the column name is

    assert_gpu_and_cpu_are_equal_collect(func, conf={YOUR CONF})

I could be missing something

Copy link
Collaborator Author

@firestarman firestarman Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought since we implement all the paths (both CPU and GPU) in the PCBS, I think the output for all of the paths should be equal to the original data.
Comparing the GPU output to CPU output bases on the CPU output of the PCBS is reliable and equal to the Spark output.
Maybe it is over designed.

Copy link
Collaborator Author

@firestarman firestarman Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading data from parquet and writing it back is to get a columnar input to be cached and then convert the cached batches to columnar batches.
I do the same as your suggestion in the test_cache_daytimeinterval_input_row test, it only checks the paths of conversion between internal rows and cached batches.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified this test

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading/writing to parquet isn't necessary to write/read columnar cache.

If you look at InMemoryTableScanExec it calls the convertColumnarBatchToCachedBatch if the spark plan supports columnar input and if the serializer supports columnar input (which it always does here)

Reading cache columnar in PCBS depends on three variables, whether the conf spark.sql.inMemoryColumnarStorage.enableVectorizedReader is enabled and the plan has 100 or less columns and the plan's output is AtomicType or NullType

Copy link
Collaborator Author

@firestarman firestarman Mar 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. Just got home and updated it.
Merged the two tests into one and updated the test function as your suggestion above.

@sameerz sameerz added the audit_3.3.0 Audit related tasks for 3.3.0 label Mar 11, 2022
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

// will be reused, and Spark expects the producer to close its batches.
val numRows = batch.numRows()
val gcbBuilder = new GpuColumnarBatchBuilder(structSchema, numRows)
for (i <- 0 until batch.numCols()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use the following to improve the performance:

var rowIndex = 0
while (rowIndex < batch.numRows()) {

    ......
    rowIndex += 1
}

A similar PR: #4770

Copy link
Collaborator Author

@firestarman firestarman Mar 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of rows is always quite large, so this change can improve some performance. However here is for columns, this suggestion will get little benfit for performance, since number of columns is usually small.

@firestarman
Copy link
Collaborator Author

@razajafri Could you review this again ?

@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

@firestarman firestarman changed the title Support DayTimeIntervalType in ParquetCachedBatchSerializer Support DayTimeIntervalType in ParquetCachedBatchSerializer[databricks] Mar 15, 2022
@razajafri razajafri merged commit 49da35a into NVIDIA:branch-22.04 Mar 15, 2022
@firestarman firestarman deleted the pcbs_ansi_itvl branch March 16, 2022 00:44
@sameerz sameerz added this to the Feb 28 - Mar 18 milestone Mar 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
audit_3.3.0 Audit related tasks for 3.3.0
Projects
None yet
4 participants