Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move GpuWindowInPandasExec in shims layers #13

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
import pandas as pd
import pytest
import time
from distutils.version import LooseVersion
from typing import Iterator
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from spark_init_internal import spark_version
from spark_session import with_cpu_session, with_gpu_session
from marks import allow_non_gpu, cudf_udf

pytestmark = pytest.mark.skipif(LooseVersion(spark_version()) >= LooseVersion('3.1.0'),
reason="Pandas UDF on GPU tests don't support Spark 3.1.0+ yet")


_conf = {
'spark.rapids.sql.exec.MapInPandasExec':'true',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, GpuTimeSub, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.rapids.shims.spark300._
Expand Down Expand Up @@ -176,7 +177,12 @@ class Spark300Shims extends SparkShims {
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[WindowInPandasExec](
"The backend for Pandas UDF with window functions, it runs on CPU itself now but supports" +
" running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(winPy, conf, p, r) => new GpuWindowInPandasExecMeta(winPy, conf, p, r))
.disabledByDefault("Performance is not ideal now")
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.rapids.execution.python
package org.apache.spark.sql.rapids.shims.spark300

import java.io.File

Expand All @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistrib
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window._
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, ShuffleManagerShimBase}
import org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinExecBase
import org.apache.spark.sql.rapids.shims.spark310.{GpuInMemoryTableScanExec, ShuffleManagerShim}
import org.apache.spark.sql.rapids.shims.spark310._

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to change this line ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the new file : spark310/src/main/scala/org/apache/spark/sql/rapids/shims/spark310/GpuWindowInPandasExec.scala

import org.apache.spark.sql.types._
import org.apache.spark.storage.{BlockId, BlockManagerId}

Expand Down Expand Up @@ -166,7 +167,12 @@ class Spark310Shims extends Spark301Shims {
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[WindowInPandasExec](
"The backend for Pandas UDF with window functions, it runs on CPU itself now but supports" +
" running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(winPy, conf, p, r) => new GpuWindowInPandasExecMeta(winPy, conf, p, r))
.disabledByDefault("Performance is not ideal now")
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}

Expand Down
Loading