Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve columnarCopy for HostColumnarToGpu #4770

Merged
merged 3 commits into from
Feb 16, 2022

Conversation

sperlingxx
Copy link
Collaborator

@sperlingxx sperlingxx commented Feb 14, 2022

Signed-off-by: sperlingxx lovedreamf@gmail.com

Following the suggestion in #4393, this PR replaced inefficient Scala FOR loops for columnar copy with Java ones, in order to speed up Columnar to Columnar transfers. Besides, it uses ColumnVector.hasNull to determine whether the host column data contains any null value, which is more accurate than nullable information from read schema.

According to a non-comprehensive test, the new implementation saves approximately half of time on the columnar copy, comparing to the original implementation.

Here is the test script, which generates 1 billion rows Long data with null:

@allow_non_gpu('FileSourceScanExec')
def test_hc2dc_perf():
    from pyspark.sql.functions import col, sum

    def gen_data(spark, data_path, n_part=10, rows_per_part=100000):
        from pyspark.sql.types import LongType, StructField, StructType

        rdd = spark.sparkContext.parallelize(list(range(n_part)), numSlices=n_part)

        def rand_gen(seed_iter):
            from random import Random
            from pyspark.sql import Row
            rd = Random(next(seed_iter))
            for _ in range(rows_per_part):
                if rd.randint(0, 10) == 0:
                    yield Row(a=None)
                yield Row(a=rd.randint(0, 100000000000000000))

        rows = rdd.mapPartitions(rand_gen)
        df = spark.createDataFrame(rows, StructType([StructField('a', LongType(), True)]))
        df.write.parquet(data_path)

    path = 'PARQUET_DATA_1234'
    with_cpu_session(lambda spark: gen_data(spark, path, n_part=1000, rows_per_part=1000000))
    with_gpu_session(
        lambda spark: spark.read.parquet(path).agg(sum(col('a') % 3)).collect(),
        conf={'spark.rapids.sql.format.parquet.enabled': 'false'})

The performance of original implementation (the new metric "buffer time" recording the exact time cost of columnar copy):

HostColumnarToGpu

concat batch time total (min, med, max )
5.4 s (18 ms, 42 ms, 208 ms )
stream time total (min, med, max )
58.5 s (278 ms, 609 ms, 1.2 s )
output columnar batches: 91
peak device memory: 8,863,707,456
op time total (min, med, max )
1.7 m (565 ms, 1.1 s, 1.6 s )
output rows: 1,090,917,512
buffer time total (min, med, max )
37.5 s (266 ms, 407 ms, 608 ms )

The performance of new implementation:

HostColumnarToGpu

concat batch time total (min, med, max )
4.8 s (17 ms, 44 ms, 137 ms )
stream time total (min, med, max )
43.8 s (240 ms, 437 ms, 1.0 s )
output columnar batches: 91
peak device memory: 8,863,707,456
op time total (min, med, max )
1.1 m (339 ms, 711 ms, 1.2 s )
output rows: 1,090,917,512
buffer time total (min, med, max )
21.1 s (102 ms, 227 ms, 320 ms )

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
@sperlingxx
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits. Nothing that is a blocker. Great work.


public static void booleanCopy(ColumnVector cv, ColumnBuilder b, int rows) {
if (!cv.hasNull()) {
for (int i = 0; i < rows; i++) b.append(cv.getBoolean(i));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would prefer to split this line up for formatting purposes.

for (int i = 0; i < rows; i++) {
  b.append(cv.getBoolean(i));
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we could benefit from an efficient bulk API in HostMemoryBuffer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ColumnVector is the Spark class, not the CUDF class. In some cases we can know what the layout is, but not all and in many of those cases the underlying data is not exposed, so there is no good way to do a bulk data copy. We could make changes to Spark to try and improve that, but this only really shows up in cases when we are reading an input format like Parquet or ORC.

Copy link
Collaborator Author

@sperlingxx sperlingxx Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@sperlingxx
Copy link
Collaborator Author

build

@sperlingxx sperlingxx merged commit 73ad70d into NVIDIA:branch-22.04 Feb 16, 2022
@sperlingxx sperlingxx deleted the speedup_hc2dc branch February 16, 2022 00:50
@sameerz sameerz added the performance A performance related task/issue label Feb 18, 2022
@sameerz sameerz added this to the Feb 14 - Feb 25 milestone Feb 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants