Skip to content

Commit

Permalink
Move GpuWindowInPandasExec.scala in shims layers
Browse files Browse the repository at this point in the history
  • Loading branch information
NvTimLiu committed Sep 25, 2020
1 parent 79d23f7 commit 28c6428
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 12 deletions.
3 changes: 0 additions & 3 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
from spark_session import with_cpu_session, with_gpu_session
from marks import allow_non_gpu, cudf_udf

pytestmark = pytest.mark.skipif(LooseVersion(spark_version()) >= LooseVersion('3.1.0'),
reason="Pandas UDF on GPU tests don't support Spark 3.1.0+ yet")


_conf = {
'spark.rapids.sql.exec.MapInPandasExec':'true',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.rapids.shims.spark300._
Expand Down Expand Up @@ -176,7 +177,12 @@ class Spark300Shims extends SparkShims {
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[WindowInPandasExec](
"The backend for Pandas UDF with window functions, it runs on CPU itself now but supports" +
" running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(winPy, conf, p, r) => new GpuWindowInPandasExecMeta(winPy, conf, p, r))
.disabledByDefault("Performance is not ideal now")
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.rapids.execution.python
package org.apache.spark.sql.rapids.shims.spark300

import java.io.File

Expand All @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window._
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinExecBase
import org.apache.spark.sql.rapids.shims.spark310.{GpuInMemoryTableScanExec, ShuffleManagerShim}
import org.apache.spark.sql.rapids.shims.spark310._
import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}

Expand Down Expand Up @@ -166,7 +167,12 @@ class Spark310Shims extends Spark301Shims {
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[WindowInPandasExec](
"The backend for Pandas UDF with window functions, it runs on CPU itself now but supports" +
" running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(winPy, conf, p, r) => new GpuWindowInPandasExecMeta(winPy, conf, p, r))
.disabledByDefault("Performance is not ideal now")
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}

Expand Down
Loading

0 comments on commit 28c6428

Please sign in to comment.