diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index b80913aa64a..3ab11c6428b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -510,6 +510,8 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { private def insertColumnarFromGpu(plan: SparkPlan): SparkPlan = { if (plan.supportsColumnar && plan.isInstanceOf[GpuExec]) { GpuBringBackToHost(insertColumnarToGpu(plan)) + } else if (plan.isInstanceOf[ColumnarToRowTransition] && plan.isInstanceOf[GpuExec]) { + plan.withNewChildren(plan.children.map(insertColumnarToGpu)) } else { plan.withNewChildren(plan.children.map(insertColumnarFromGpu)) } diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala index 250a4fa904c..9e868454902 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -59,7 +59,7 @@ case class GpuAtomicCreateTableAsSelectExec( query: SparkPlan, properties: Map[String, String], writeOptions: CaseInsensitiveStringMap, - ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec { + ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { override def supportsColumnar: Boolean = false diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala index ee3402c4f12..d1f842a937a 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -62,7 +62,7 @@ case class GpuAtomicReplaceTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, orCreate: Boolean, invalidateCache: (TableCatalog, Table, Identifier) => Unit) - extends TableWriteExecHelper with GpuExec { + extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { override def supportsColumnar: Boolean = false diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala index ddf9ce97b40..e76d9b7f58a 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -58,7 +58,7 @@ case class GpuAtomicCreateTableAsSelectExec( query: SparkPlan, tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, - ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec { + ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec) diff --git a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala index 1f00da4ad50..c5059046867 100644 --- a/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark321db/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -63,7 +63,7 @@ case class GpuAtomicReplaceTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, orCreate: Boolean, invalidateCache: (TableCatalog, Table, Identifier) => Unit) - extends TableWriteExecHelper with GpuExec { + extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec) diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala index f4c1fb198a9..fd64ae0d568 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -58,7 +58,7 @@ case class GpuAtomicCreateTableAsSelectExec( query: SparkPlan, tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, - ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec { + ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec) diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala index 97e90b4a801..adedd9f9091 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -63,7 +63,7 @@ case class GpuAtomicReplaceTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, orCreate: Boolean, invalidateCache: (TableCatalog, Table, Identifier) => Unit) - extends TableWriteExecHelper with GpuExec { + extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec) diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala index 28a6e5910d3..1918e979c44 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -55,7 +55,7 @@ case class GpuAtomicCreateTableAsSelectExec( query: SparkPlan, tableSpec: TableSpec, writeOptions: CaseInsensitiveStringMap, - ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec { + ifNotExists: Boolean) extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec) diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala index e3e5cd5a40b..8031e503fbe 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch @@ -60,7 +60,7 @@ case class GpuAtomicReplaceTableAsSelectExec( writeOptions: CaseInsensitiveStringMap, orCreate: Boolean, invalidateCache: (TableCatalog, Table, Identifier) => Unit) - extends TableWriteExecHelper with GpuExec { + extends TableWriteExecHelper with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec) diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala index 2f6869be7e4..2c7039d328c 100644 --- a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicCreateTableAsSelectExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TableSpec} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.ColumnarToRowTransition import org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec import org.apache.spark.sql.vectorized.ColumnarBatch @@ -50,7 +51,8 @@ case class GpuAtomicCreateTableAsSelectExec( query: LogicalPlan, tableSpec: TableSpec, writeOptions: Map[String, String], - ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec with GpuExec { + ifNotExists: Boolean) + extends V2CreateTableAsSelectBaseExec with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec) diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala index 9ac78288bef..cae386eeadf 100644 --- a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala +++ b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/rapids/GpuAtomicReplaceTableAsSelectExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TableSpec} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.ColumnarToRowTransition import org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec import org.apache.spark.sql.vectorized.ColumnarBatch @@ -55,7 +56,7 @@ case class GpuAtomicReplaceTableAsSelectExec( writeOptions: Map[String, String], orCreate: Boolean, invalidateCache: (TableCatalog, Table, Identifier) => Unit) - extends V2CreateTableAsSelectBaseExec with GpuExec { + extends V2CreateTableAsSelectBaseExec with GpuExec with ColumnarToRowTransition { val properties = CatalogV2Util.convertTableProperties(tableSpec)