-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid requiring single batch when using out-of-core sort #5903
Avoid requiring single batch when using out-of-core sort #5903
Conversation
Signed-off-by: Chong Gao <res_life@163.com>
build |
build |
Verified a large data frame, OOM did not occur after disabled the I will file a follow-on issue to explore support for something like |
Thanks, @wjxiz1992 verified this PR against the corresponding NV bug: NDS 2.0 convert CSV to Parquet failed by OOM |
Filed a follow-on issue for |
@revans2 Help review |
@@ -465,3 +465,14 @@ def test_write_daytime_interval(spark_tmp_path): | |||
lambda spark, path: spark.read.parquet(path), | |||
data_path, | |||
conf=writer_confs) | |||
|
|||
# TODO need to test large DF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We simulate a large DF by setting the batch size to be very small. This lets us send multiple batches.
@@ -36,7 +36,8 @@ case class GpuCreateDataSourceTableAsSelectCommand( | |||
query: LogicalPlan, | |||
outputColumnNames: Seq[String], | |||
origProvider: Class[_], | |||
gpuFileFormat: ColumnarFileFormat) | |||
gpuFileFormat: ColumnarFileFormat, | |||
useStableSort: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only nit is that we pass useStableSort around this code a lot, but in the final part when we do the sort we get it from a different location.
spark-rapids/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala
Lines 207 to 211 in 4a313e7
val sortType = if (RapidsConf.STABLE_SORT.get(plan.conf)) { | |
FullSortSingleBatch | |
} else { | |
OutOfCoreSort | |
} |
Could we please make it consistent? Either we pass it all the way down all the time, or we go off of the plan.conf all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, see the below comment.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala
Show resolved
Hide resolved
|
build |
Closes #5448
Problem
The cause is that the input of the out-of-core sort is a single large batch.
The single large batch caused the OOM.
out-of-core sort should not require a single batch, it can pull all the input batches and then sort.
Before the out-of-core sort feature was added in, we do need the single batch for in-memory sorting.
Solution
When executing partitioned writes which requires sorting, we use out-of-core sort and do not require
single batch
.Note: If specified
stable sort
configuration, still needrequire a single batch
as before.Signed-off-by: Chong Gao res_life@163.com