From c199aaa8f48d3c882f8f3d06079d9a8d241def1e Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 23 Sep 2020 17:15:06 +0800 Subject: [PATCH 1/2] Fix a hanging issue when processing empty data. The output iterator will wait on the batch queue when calling `hasNext`, and suppose to be waked up when the Python runner inserts something into the batch queue. But the insertion will never happen if the input data is empty. So it hangs forever. The solution is to let the Python runner always wake up the output iterator after it finishes the data writing by calling the new added API `finish()`. Signed-off-by: Firestarman --- .../execution/python/GpuArrowEvalPythonExec.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index 74dd87160b9..4995d8491aa 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -187,6 +187,14 @@ class BatchQueue extends AutoCloseable with Arm { } } + def finish(): Unit = synchronized { + if (!isSet) { + // Wake up anyone waiting for the first batch. + isSet = true + notifyAll() + } + } + def remove(): ColumnarBatch = synchronized { if (queue.isEmpty) { null @@ -369,7 +377,8 @@ class GpuArrowPythonRunner( schema: StructType, timeZoneId: String, conf: Map[String, String], - batchSize: Long) + batchSize: Long, + onDataWriteFinished: () => Unit) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets) with GpuPythonArrowOutput { @@ -431,6 +440,7 @@ class GpuArrowPythonRunner( } { writer.close() dataOut.flush() + if (onDataWriteFinished != null) onDataWriteFinished() } } } @@ -587,7 +597,8 @@ case class GpuArrowEvalPythonExec( schema, sessionLocalTimeZone, pythonRunnerConf, - batchSize){ + batchSize, + () => queue.finish()){ override def minReadTargetBatchSize: Int = targetReadBatchSize }.compute(projectedIterator, context.partitionId(), From fa7d54506107c39b4d4edc592f156536b13cff39 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 24 Sep 2020 14:42:51 +0800 Subject: [PATCH 2/2] Add tests for processing empty data. The 'small_data' is small enough to let some tasks get no data when running. Now only test this for the Scalar type who just implements the columnar pipeline. Signed-off-by: Firestarman --- .../src/main/python/udf_cudf_test.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index c97ba94df1c..2e6093ea734 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -36,10 +36,13 @@ 'spark.rapids.sql.python.gpu.enabled': 'true' } +small_data = [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)] -def _create_df(spark): - elements = list(map(lambda i: (i, i/1.0), range(1, 5000))) - return spark.createDataFrame(elements * 2, ("id", "v")) +large_data = list(map(lambda i: (i, i/1.0), range(1, 5000))) * 2 + + +def _create_df(spark, data=large_data): + return spark.createDataFrame(data, ("id", "v")) # since this test requires to run different functions on CPU and GPU(need cudf), @@ -76,13 +79,14 @@ def _plus_one_gpu_func(v: pd.Series) -> pd.Series: @cudf_udf -def test_with_column(enable_cudf_udf): +@pytest.mark.parametrize('data', [small_data, large_data], ids=['small data', 'large data']) +def test_with_column(enable_cudf_udf, data): def cpu_run(spark): - df = _create_df(spark) + df = _create_df(spark, data) return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect() def gpu_run(spark): - df = _create_df(spark) + df = _create_df(spark, data) return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect() _assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)