Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Mortgage ETL sample failed with spark.sql.adaptive enabled on AWS EMR 6.2 #1423

Closed
mgzhao opened this issue Dec 17, 2020 · 7 comments
Closed
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@mgzhao
Copy link
Contributor

mgzhao commented Dec 17, 2020

Describe the bug

Mortgage ETL sample failed with spark.sql.adaptive enabled on AWS EMR 6.2 with 0.3 release.
With "spark.sql.adaptive.enabled":"false", the same ETL sample on EMR PySpark Notebook will run and give us end-to-end number.

With AQE on we get the following error message from the notebook

An error was encountered:
An error occurred while calling o757.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.rapids.GpuFileFormatWriter$.write(GpuFileFormatWriter.scala:250)
    at org.apache.spark.sql.rapids.GpuInsertIntoHadoopFsRelationCommand.runColumnar(GpuInsertIntoHadoopFsRelationCommand.scala:166)
    at com.nvidia.spark.rapids.GpuDataWritingCommandExec.sideEffectResult$lzycompute(GpuDataWritingCommandExec.scala:61)
    at com.nvidia.spark.rapids.GpuDataWritingCommandExec.sideEffectResult(GpuDataWritingCommandExec.scala:60)
    at com.nvidia.spark.rapids.GpuDataWritingCommandExec.doExecuteColumnar(GpuDataWritingCommandExec.scala:84)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuColumnarToRowExecParent.doExecute(GpuColumnarToRowExec.scala:301)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:848)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError: Method com/nvidia/spark/rapids/shims/spark301/GpuBroadcastExchangeExec.org$apache$spark$sql$execution$exchange$BroadcastExchangeLike$$super$sparkContext()Lorg/apache/spark/SparkContext; is abstract
    at com.nvidia.spark.rapids.shims.spark301.GpuBroadcastExchangeExec.org$apache$spark$sql$execution$exchange$BroadcastExchangeLike$$super$sparkContext(GpuBroadcastExchangeExec.scala)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeLike.cancel(BroadcastExchangeExec.scala:105)
    at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec.cancel(QueryStageExec.scala:242)
    at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.cancel(AdaptiveExecutable.scala:366)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:284)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1$adapted(AdaptiveExecutor.scala:282)
    at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:95)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1$adapted(AdaptiveExecutor.scala:282)
    at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:95)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1$adapted(AdaptiveExecutor.scala:282)
    at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:95)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:77)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:164)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:163)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:390)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at com.nvidia.spark.rapids.GpuRowToColumnarExec.doExecuteColumnar(GpuRowToColumnarExec.scala:811)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at org.apache.spark.sql.rapids.GpuFileFormatWriter$.write(GpuFileFormatWriter.scala:194)
    ... 45 more
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 936, in parquet
    self._jwrite.parquet(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o757.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.rapids.GpuFileFormatWriter$.write(GpuFileFormatWriter.scala:250)
    at org.apache.spark.sql.rapids.GpuInsertIntoHadoopFsRelationCommand.runColumnar(GpuInsertIntoHadoopFsRelationCommand.scala:166)
    at com.nvidia.spark.rapids.GpuDataWritingCommandExec.sideEffectResult$lzycompute(GpuDataWritingCommandExec.scala:61)
    at com.nvidia.spark.rapids.GpuDataWritingCommandExec.sideEffectResult(GpuDataWritingCommandExec.scala:60)
    at com.nvidia.spark.rapids.GpuDataWritingCommandExec.doExecuteColumnar(GpuDataWritingCommandExec.scala:84)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at com.nvidia.spark.rapids.GpuColumnarToRowExecParent.doExecute(GpuColumnarToRowExec.scala:301)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:848)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError: Method com/nvidia/spark/rapids/shims/spark301/GpuBroadcastExchangeExec.org$apache$spark$sql$execution$exchange$BroadcastExchangeLike$$super$sparkContext()Lorg/apache/spark/SparkContext; is abstract
    at com.nvidia.spark.rapids.shims.spark301.GpuBroadcastExchangeExec.org$apache$spark$sql$execution$exchange$BroadcastExchangeLike$$super$sparkContext(GpuBroadcastExchangeExec.scala)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeLike.cancel(BroadcastExchangeExec.scala:105)
    at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec.cancel(QueryStageExec.scala:242)
    at org.apache.spark.sql.execution.adaptive.MaterializeExecutable.cancel(AdaptiveExecutable.scala:366)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:284)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1$adapted(AdaptiveExecutor.scala:282)
    at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:95)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1$adapted(AdaptiveExecutor.scala:282)
    at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:95)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.$anonfun$fail$1$adapted(AdaptiveExecutor.scala:282)
    at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:95)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutorRuntime.fail(AdaptiveExecutor.scala:282)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:77)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:164)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:163)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:390)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at com.nvidia.spark.rapids.GpuRowToColumnarExec.doExecuteColumnar(GpuRowToColumnarExec.scala:811)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:207)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:203)
    at org.apache.spark.sql.rapids.GpuFileFormatWriter$.write(GpuFileFormatWriter.scala:194)
    ... 45 more

Steps/Code to reproduce bug

Notebook Code: (please adjust the S3 location for dataset)

import time

start = time.time()

from pyspark import broadcast
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

def _get_quarter_from_csv_file_name():
    return substring_index(substring_index(input_file_name(), '.', 1), '_', -1)

_csv_perf_schema = StructType([
    StructField('loan_id', LongType()),
    StructField('monthly_reporting_period', StringType()),
    StructField('servicer', StringType()),
    StructField('interest_rate', DoubleType()),
    StructField('current_actual_upb', DoubleType()),
    StructField('loan_age', DoubleType()),
    StructField('remaining_months_to_legal_maturity', DoubleType()),
    StructField('adj_remaining_months_to_maturity', DoubleType()),
    StructField('maturity_date', StringType()),
    StructField('msa', DoubleType()),
    StructField('current_loan_delinquency_status', IntegerType()),
    StructField('mod_flag', StringType()),
    StructField('zero_balance_code', StringType()),
    StructField('zero_balance_effective_date', StringType()),
    StructField('last_paid_installment_date', StringType()),
    StructField('foreclosed_after', StringType()),
    StructField('disposition_date', StringType()),
    StructField('foreclosure_costs', DoubleType()),
    StructField('prop_preservation_and_repair_costs', DoubleType()),
    StructField('asset_recovery_costs', DoubleType()),
    StructField('misc_holding_expenses', DoubleType()),
    StructField('holding_taxes', DoubleType()),
    StructField('net_sale_proceeds', DoubleType()),
    StructField('credit_enhancement_proceeds', DoubleType()),
    StructField('repurchase_make_whole_proceeds', StringType()),
    StructField('other_foreclosure_proceeds', DoubleType()),
    StructField('non_interest_bearing_upb', DoubleType()),
    StructField('principal_forgiveness_upb', StringType()),
    StructField('repurchase_make_whole_proceeds_flag', StringType()),
    StructField('foreclosure_principal_write_off_amount', StringType()),
    StructField('servicing_activity_indicator', StringType())])
_csv_acq_schema = StructType([
    StructField('loan_id', LongType()),
    StructField('orig_channel', StringType()),
    StructField('seller_name', StringType()),
    StructField('orig_interest_rate', DoubleType()),
    StructField('orig_upb', IntegerType()),
    StructField('orig_loan_term', IntegerType()),
    StructField('orig_date', StringType()),
    StructField('first_pay_date', StringType()),
    StructField('orig_ltv', DoubleType()),
    StructField('orig_cltv', DoubleType()),
    StructField('num_borrowers', DoubleType()),
    StructField('dti', DoubleType()),
    StructField('borrower_credit_score', DoubleType()),
    StructField('first_home_buyer', StringType()),
    StructField('loan_purpose', StringType()),
    StructField('property_type', StringType()),
    StructField('num_units', IntegerType()),
    StructField('occupancy_status', StringType()),
    StructField('property_state', StringType()),
    StructField('zip', IntegerType()),
    StructField('mortgage_insurance_percent', DoubleType()),
    StructField('product_type', StringType()),
    StructField('coborrow_credit_score', DoubleType()),
    StructField('mortgage_insurance_type', DoubleType()),
    StructField('relocation_mortgage_indicator', StringType())])
_name_mapping = [
        ("WITMER FUNDING, LLC", "Witmer"),
        ("WELLS FARGO CREDIT RISK TRANSFER SECURITIES TRUST 2015", "Wells Fargo"),
        ("WELLS FARGO BANK,  NA" , "Wells Fargo"),
        ("WELLS FARGO BANK, N.A." , "Wells Fargo"),
        ("WELLS FARGO BANK, NA" , "Wells Fargo"),
        ("USAA FEDERAL SAVINGS BANK" , "USAA"),
        ("UNITED SHORE FINANCIAL SERVICES, LLC D\\/B\\/A UNITED WHOLESALE MORTGAGE" , "United Seq(e"),
        ("U.S. BANK N.A." , "US Bank"),
        ("SUNTRUST MORTGAGE INC." , "Suntrust"),
        ("STONEGATE MORTGAGE CORPORATION" , "Stonegate Mortgage"),
        ("STEARNS LENDING, LLC" , "Stearns Lending"),
        ("STEARNS LENDING, INC." , "Stearns Lending"),
        ("SIERRA PACIFIC MORTGAGE COMPANY, INC." , "Sierra Pacific Mortgage"),
        ("REGIONS BANK" , "Regions"),
        ("RBC MORTGAGE COMPANY" , "RBC"),
        ("QUICKEN LOANS INC." , "Quicken Loans"),
        ("PULTE MORTGAGE, L.L.C." , "Pulte Mortgage"),
        ("PROVIDENT FUNDING ASSOCIATES, L.P." , "Provident Funding"),
        ("PROSPECT MORTGAGE, LLC" , "Prospect Mortgage"),
        ("PRINCIPAL RESIDENTIAL MORTGAGE CAPITAL RESOURCES, LLC" , "Principal Residential"),
        ("PNC BANK, N.A." , "PNC"),
        ("PMT CREDIT RISK TRANSFER TRUST 2015-2" , "PennyMac"),
        ("PHH MORTGAGE CORPORATION" , "PHH Mortgage"),
        ("PENNYMAC CORP." , "PennyMac"),
        ("PACIFIC UNION FINANCIAL, LLC" , "Other"),
        ("OTHER" , "Other"),
        ("NYCB MORTGAGE COMPANY, LLC" , "NYCB"),
        ("NEW YORK COMMUNITY BANK" , "NYCB"),
        ("NETBANK FUNDING SERVICES" , "Netbank"),
        ("NATIONSTAR MORTGAGE, LLC" , "Nationstar Mortgage"),
        ("METLIFE BANK, NA" , "Metlife"),
        ("LOANDEPOT.COM, LLC" , "LoanDepot.com"),
        ("J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2015-1" , "JP Morgan Chase"),
        ("J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2014-1" , "JP Morgan Chase"),
        ("JPMORGAN CHASE BANK, NATIONAL ASSOCIATION" , "JP Morgan Chase"),
        ("JPMORGAN CHASE BANK, NA" , "JP Morgan Chase"),
        ("JP MORGAN CHASE BANK, NA" , "JP Morgan Chase"),
        ("IRWIN MORTGAGE, CORPORATION" , "Irwin Mortgage"),
        ("IMPAC MORTGAGE CORP." , "Impac Mortgage"),
        ("HSBC BANK USA, NATIONAL ASSOCIATION" , "HSBC"),
        ("HOMEWARD RESIDENTIAL, INC." , "Homeward Mortgage"),
        ("HOMESTREET BANK" , "Other"),
        ("HOMEBRIDGE FINANCIAL SERVICES, INC." , "HomeBridge"),
        ("HARWOOD STREET FUNDING I, LLC" , "Harwood Mortgage"),
        ("GUILD MORTGAGE COMPANY" , "Guild Mortgage"),
        ("GMAC MORTGAGE, LLC (USAA FEDERAL SAVINGS BANK)" , "GMAC"),
        ("GMAC MORTGAGE, LLC" , "GMAC"),
        ("GMAC (USAA)" , "GMAC"),
        ("FREMONT BANK" , "Fremont Bank"),
        ("FREEDOM MORTGAGE CORP." , "Freedom Mortgage"),
        ("FRANKLIN AMERICAN MORTGAGE COMPANY" , "Franklin America"),
        ("FLEET NATIONAL BANK" , "Fleet National"),
        ("FLAGSTAR CAPITAL MARKETS CORPORATION" , "Flagstar Bank"),
        ("FLAGSTAR BANK, FSB" , "Flagstar Bank"),
        ("FIRST TENNESSEE BANK NATIONAL ASSOCIATION" , "Other"),
        ("FIFTH THIRD BANK" , "Fifth Third Bank"),
        ("FEDERAL HOME LOAN BANK OF CHICAGO" , "Fedral Home of Chicago"),
        ("FDIC, RECEIVER, INDYMAC FEDERAL BANK FSB" , "FDIC"),
        ("DOWNEY SAVINGS AND LOAN ASSOCIATION, F.A." , "Downey Mortgage"),
        ("DITECH FINANCIAL LLC" , "Ditech"),
        ("CITIMORTGAGE, INC." , "Citi"),
        ("CHICAGO MORTGAGE SOLUTIONS DBA INTERFIRST MORTGAGE COMPANY" , "Chicago Mortgage"),
        ("CHICAGO MORTGAGE SOLUTIONS DBA INTERBANK MORTGAGE COMPANY" , "Chicago Mortgage"),
        ("CHASE HOME FINANCE, LLC" , "JP Morgan Chase"),
        ("CHASE HOME FINANCE FRANKLIN AMERICAN MORTGAGE COMPANY" , "JP Morgan Chase"),
        ("CHASE HOME FINANCE (CIE 1)" , "JP Morgan Chase"),
        ("CHASE HOME FINANCE" , "JP Morgan Chase"),
        ("CASHCALL, INC." , "CashCall"),
        ("CAPITAL ONE, NATIONAL ASSOCIATION" , "Capital One"),
        ("CALIBER HOME LOANS, INC." , "Caliber Funding"),
        ("BISHOPS GATE RESIDENTIAL MORTGAGE TRUST" , "Bishops Gate Mortgage"),
        ("BANK OF AMERICA, N.A." , "Bank of America"),
        ("AMTRUST BANK" , "AmTrust"),
        ("AMERISAVE MORTGAGE CORPORATION" , "Amerisave"),
        ("AMERIHOME MORTGAGE COMPANY, LLC" , "AmeriHome Mortgage"),
        ("ALLY BANK" , "Ally Bank"),
        ("ACADEMY MORTGAGE CORPORATION" , "Academy Mortgage"),
        ("NO CASH-OUT REFINANCE" , "OTHER REFINANCE"),
        ("REFINANCE - NOT SPECIFIED" , "OTHER REFINANCE"),
        ("Other REFINANCE" , "OTHER REFINANCE")]

cate_col_names = [
        "orig_channel",
        "first_home_buyer",
        "loan_purpose",
        "property_type",
        "occupancy_status",
        "property_state",
        "relocation_mortgage_indicator",
        "seller_name",
        "mod_flag"
]
# Numberic columns
label_col_name = "delinquency_12"
numeric_col_names = [
        "orig_interest_rate",
        "orig_upb",
        "orig_loan_term",
        "orig_ltv",
        "orig_cltv",
        "num_borrowers",
        "dti",
        "borrower_credit_score",
        "num_units",
        "zip",
        "mortgage_insurance_percent",
        "current_loan_delinquency_status",
        "current_actual_upb",
        "interest_rate",
        "loan_age",
        "msa",
        "non_interest_bearing_upb",
        label_col_name
]
all_col_names = cate_col_names + numeric_col_names

def read_perf_csv(spark, path):
    return spark.read.format('csv') \
            .option('nullValue', '') \
            .option('header', 'false') \
            .option('delimiter', '|') \
            .schema(_csv_perf_schema) \
            .load(path) \
            .withColumn('quarter', _get_quarter_from_csv_file_name())

def read_acq_csv(spark, path):
    return spark.read.format('csv') \
            .option('nullValue', '') \
            .option('header', 'false') \
            .option('delimiter', '|') \
            .schema(_csv_acq_schema) \
            .load(path) \
            .withColumn('quarter', _get_quarter_from_csv_file_name())

def _parse_dates(perf):
    return perf \
            .withColumn('monthly_reporting_period', to_date(col('monthly_reporting_period'), 'MM/dd/yyyy')) \
            .withColumn('monthly_reporting_period_month', month(col('monthly_reporting_period'))) \
            .withColumn('monthly_reporting_period_year', year(col('monthly_reporting_period'))) \
            .withColumn('monthly_reporting_period_day', dayofmonth(col('monthly_reporting_period'))) \
            .withColumn('last_paid_installment_date', to_date(col('last_paid_installment_date'), 'MM/dd/yyyy')) \
            .withColumn('foreclosed_after', to_date(col('foreclosed_after'), 'MM/dd/yyyy')) \
            .withColumn('disposition_date', to_date(col('disposition_date'), 'MM/dd/yyyy')) \
            .withColumn('maturity_date', to_date(col('maturity_date'), 'MM/yyyy')) \
            .withColumn('zero_balance_effective_date', to_date(col('zero_balance_effective_date'), 'MM/yyyy'))

def _create_perf_deliquency(spark, perf):
    aggDF = perf.select(
            col("quarter"),
            col("loan_id"),
            col("current_loan_delinquency_status"),
            when(col("current_loan_delinquency_status") >= 1, col("monthly_reporting_period")).alias("delinquency_30"),
            when(col("current_loan_delinquency_status") >= 3, col("monthly_reporting_period")).alias("delinquency_90"),
            when(col("current_loan_delinquency_status") >= 6, col("monthly_reporting_period")).alias("delinquency_180")) \
                    .groupBy("quarter", "loan_id") \
                    .agg(
                            max("current_loan_delinquency_status").alias("delinquency_12"),
                            min("delinquency_30").alias("delinquency_30"),
                            min("delinquency_90").alias("delinquency_90"),
                            min("delinquency_180").alias("delinquency_180")) \
                                    .select(
                                            col("quarter"),
                                            col("loan_id"),
                                            (col("delinquency_12") >= 1).alias("ever_30"),
                                            (col("delinquency_12") >= 3).alias("ever_90"),
                                            (col("delinquency_12") >= 6).alias("ever_180"),
                                            col("delinquency_30"),
                                            col("delinquency_90"),
                                            col("delinquency_180"))
    joinedDf = perf \
            .withColumnRenamed("monthly_reporting_period", "timestamp") \
            .withColumnRenamed("monthly_reporting_period_month", "timestamp_month") \
            .withColumnRenamed("monthly_reporting_period_year", "timestamp_year") \
            .withColumnRenamed("current_loan_delinquency_status", "delinquency_12") \
            .withColumnRenamed("current_actual_upb", "upb_12") \
            .select("quarter", "loan_id", "timestamp", "delinquency_12", "upb_12", "timestamp_month", "timestamp_year") \
            .join(aggDF, ["loan_id", "quarter"], "left_outer")
    # calculate the 12 month delinquency and upb values
    months = 12
    monthArray = [lit(x) for x in range(0, 12)]
    # explode on a small amount of data is actually slightly more efficient than a cross join
    testDf = joinedDf \
            .withColumn("month_y", explode(array(monthArray))) \
            .select(
                    col("quarter"),
                    floor(((col("timestamp_year") * 12 + col("timestamp_month")) - 24000) / months).alias("josh_mody"),
                    floor(((col("timestamp_year") * 12 + col("timestamp_month")) - 24000 - col("month_y")) / months).alias("josh_mody_n"),
                    col("ever_30"),
                    col("ever_90"),
                    col("ever_180"),
                    col("delinquency_30"),
                    col("delinquency_90"),
                    col("delinquency_180"),
                    col("loan_id"),
                    col("month_y"),
                    col("delinquency_12"),
                    col("upb_12")) \
                            .groupBy("quarter", "loan_id", "josh_mody_n", "ever_30", "ever_90", "ever_180", "delinquency_30", "delinquency_90", "delinquency_180", "month_y") \
                            .agg(max("delinquency_12").alias("delinquency_12"), min("upb_12").alias("upb_12")) \
                            .withColumn("timestamp_year", floor((lit(24000) + (col("josh_mody_n") * lit(months)) + (col("month_y") - 1)) / lit(12))) \
                            .selectExpr('*', 'pmod(24000 + (josh_mody_n * {}) + month_y, 12) as timestamp_month_tmp'.format(months)) \
                            .withColumn("timestamp_month", when(col("timestamp_month_tmp") == lit(0), lit(12)).otherwise(col("timestamp_month_tmp"))) \
                            .withColumn("delinquency_12", ((col("delinquency_12") > 3).cast("int") + (col("upb_12") == 0).cast("int")).alias("delinquency_12")) \
                            .drop("timestamp_month_tmp", "josh_mody_n", "month_y")
    return perf.withColumnRenamed("monthly_reporting_period_month", "timestamp_month") \
            .withColumnRenamed("monthly_reporting_period_year", "timestamp_year") \
            .join(testDf, ["quarter", "loan_id", "timestamp_year", "timestamp_month"], "left") \
            .drop("timestamp_year", "timestamp_month")

def _create_acquisition(spark, acq):
    nameMapping = spark.createDataFrame(_name_mapping, ["from_seller_name", "to_seller_name"])
    return acq.join(nameMapping, col("seller_name") == col("from_seller_name"), "left") \
      .drop("from_seller_name") \
      .withColumn("old_name", col("seller_name")) \
      .withColumn("seller_name", coalesce(col("to_seller_name"), col("seller_name"))) \
      .drop("to_seller_name") \
      .withColumn("orig_date", to_date(col("orig_date"), "MM/yyyy")) \
      .withColumn("first_pay_date", to_date(col("first_pay_date"), "MM/yyyy"))

def _gen_dictionary(etl_df, col_names):
    cnt_table = etl_df.select(posexplode(array([col(i) for i in col_names])))\
                    .withColumnRenamed("pos", "column_id")\
                    .withColumnRenamed("col", "data")\
                    .filter("data is not null")\
                    .groupBy("column_id", "data")\
                    .count()
    windowed = Window.partitionBy("column_id").orderBy(desc("count"))
    return cnt_table.withColumn("id", row_number().over(windowed)).drop("count")


def _cast_string_columns_to_numeric(spark, input_df):
    cached_dict_df = _gen_dictionary(input_df, cate_col_names).cache()
    output_df = input_df
    #  Generate the final table with all columns being numeric.
    for col_pos, col_name in enumerate(cate_col_names):
        col_dict_df = cached_dict_df.filter(col("column_id") == col_pos)\
                                    .drop("column_id")\
                                    .withColumnRenamed("data", col_name)
        
        output_df = output_df.join(broadcast(col_dict_df), col_name, "left")\
                        .drop(col_name)\
                        .withColumnRenamed("id", col_name)
    return output_df

def run_mortgage(spark, perf, acq):
    parsed_perf = _parse_dates(perf)
    perf_deliqency = _create_perf_deliquency(spark, parsed_perf)
    cleaned_acq = _create_acquisition(spark, acq)
    df = perf_deliqency.join(cleaned_acq, ["loan_id", "quarter"], "inner")
    # change to this for 20 year mortgage data - test_quarters = ['2016Q1','2016Q2','2016Q3','2016Q4']
    test_quarters = ['2000Q4']
    train_df = df.filter(df.quarter.isin(test_quarters)).drop("quarter")
    test_df = df.filter(df.quarter.isin(test_quarters)).drop("quarter")
    casted_train_df = _cast_string_columns_to_numeric(spark, train_df)\
                    .select(all_col_names)\
                    .withColumn(label_col_name, when(col(label_col_name) > 0, 1).otherwise(0))\
                    .fillna(float(0))
    casted_test_df = _cast_string_columns_to_numeric(spark, test_df)\
                    .select(all_col_names)\
                    .withColumn(label_col_name, when(col(label_col_name) > 0, 1).otherwise(0))\
                    .fillna(float(0))
    return casted_train_df, casted_test_df

orig_perf_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage-dataset/perf/Performance_20*'
orig_acq_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage-dataset/acq/Acquisition_20*'

#orig_perf_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage-etl-demo/perf/Performance_2000Q*'
#orig_acq_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage-etl-demo/acq/Acquisition_2000Q*'


train_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage-xgboost-demo/train/'
test_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage-xgboost-demo/test/'

tmp_perf_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage_parquet_gpu/perf/'
tmp_acq_path = 's3://spark-xgboost-mortgage-dataset-east1/mortgage_parquet_gpu/acq/'


# Lets transcode the data first
#start = time.time()
# we want a few big files instead of lots of small files
spark.conf.set('spark.sql.files.maxPartitionBytes', '1G')
acq = read_acq_csv(spark, orig_acq_path)
acq.repartition(20).write.parquet(tmp_acq_path, mode='overwrite')
perf = read_perf_csv(spark, orig_perf_path)
perf.coalesce(80).write.parquet(tmp_perf_path, mode='overwrite')
#end = time.time()
#print(end - start)

# Now lets actually process the data\n",
#start = time.time()
#spark.conf.set('spark.sql.files.maxPartitionBytes', '1G')
spark.conf.set('spark.sql.shuffle.partitions', '160')
perf = spark.read.parquet(tmp_perf_path)
acq = spark.read.parquet(tmp_acq_path)
train_out, test_out = run_mortgage(spark, perf, acq)
train_out.write.parquet(train_path, mode='overwrite')
#end = time.time()
#print(end - start)
test_out.write.parquet(test_path, mode='overwrite')
end = time.time()
print(end - start)

Expected behavior

Morgage ETL example should complete without issue and report around 620s.

Environment details (please complete the following information)

  • Environment location: AWS EMR, 1x M5.xlarge for Master, 2x g4dn.12xlarge for Core nodes. EMR 6.2 release, jar file manually replaced with v0.3 (/usr/share/aws/emr/spark-rapids/lib)

  • Spark configuration settings related to the issue

you can manually turn off AQE
sudo vi /etc/spark/conf/spark-defaults.conf
spark.sql.adaptive.enabled false
#restart on master
sudo systemctl restart hadoop-yarn-resourcemanager.service

EMR cluster configuration

--configurations '[{"Classification":"spark","Properties":{"enableSparkRapids":"true"}},{"Classification":"yarn-site","Properties":{"yarn.nodemanager.linux-container-executor.cgroups.mount":"true","yarn.nodemanager.linux-container-executor.cgroups.mount-path":"/sys/fs/cgroup","yarn.nodemanager.resource-plugins.gpu.path-to-discovery-executables":"/usr/bin","yarn.nodemanager.linux-container-executor.cgroups.hierarchy":"yarn","yarn.nodemanager.container-executor.class":"org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor","yarn.resource-types":"yarn.io/gpu","yarn.nodemanager.resource-plugins":"yarn.io/gpu","yarn.nodemanager.resource-plugins.gpu.allowed-gpu-devices":"auto"}},{"Classification":"container-executor","Properties":{},"Configurations":[{"Classification":"gpu","Properties":{"module.enabled":"true"}},{"Classification":"cgroups","Properties":{"root":"/sys/fs/cgroup","yarn-hierarchy":"yarn"}}]},{"Classification":"spark-defaults","Properties":{"spark.task.cpus ":"1","spark.submit.pyFiles":"/usr/lib/spark/jars/xgboost4j-spark_3.0-1.0.0-0.2.0.jar","spark.executor.extraLibraryPath":"/usr/local/cuda/targets/x86_64-linux/lib:/usr/local/cuda/extras/CUPTI/lib64:/usr/local/cuda/compat/lib:/usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native","spark.plugins":"com.nvidia.spark.SQLPlugin","spark.executor.cores":"12","spark.sql.files.maxPartitionBytes":"256m","spark.executor.resource.gpu.discoveryScript":"/usr/lib/spark/scripts/gpu/getGpusResources.sh","spark.sql.shuffle.partitions":"200","spark.task.resource.gpu.amount":"0.0833","spark.rapids.memory.pinnedPool.size":"2G","spark.executor.resource.gpu.amount":"1","spark.sql.adaptive.enabled":"false","spark.locality.wait":"0s","spark.sql.sources.useV1SourceList":"","spark.executor.memoryOverhead":"2G","spark.executor.extraJavaOptions":"-Dai.rapids.cudf.prefer-pinned=true","spark.rapids.sql.concurrentGpuTasks":"2"}},{"Classification":"capacity-scheduler","Properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}}]'

Additional context
believe I run into same issue in v0.2 on EMR. so we turn AQE off when bring up the cluster.

@mgzhao mgzhao added ? - Needs Triage Need team to review and classify bug Something isn't working labels Dec 17, 2020
@sameerz sameerz added P0 Must have for release and removed ? - Needs Triage Need team to review and classify labels Dec 17, 2020
@sameerz sameerz added this to the Dec 7 - Dec 18 milestone Dec 17, 2020
@tgravescs
Copy link
Collaborator

it looks like EMR definition of org.apache.spark.sql.execution.exchange.BroadcastExchangeLike extends another class so we can try to shim this.

@tgravescs
Copy link
Collaborator

tgravescs commented Dec 19, 2020

this happens other places then EMR.

Caused by: java.lang.IllegalStateException: Join needs to run on CPU but at least one input query stage ran on GPU
        at com.nvidia.spark.rapids.SparkPlanMeta.makeShuffleConsistent(RapidsMeta.scala:526)
        at com.nvidia.spark.rapids.SparkPlanMeta.fixUpJoinConsistencyIfNeeded(RapidsMeta.scala:541)
        at com.nvidia.spark.rapids.SparkPlanMeta.$anonfun$fixUpJoinConsistencyIfNeeded$1(RapidsMeta.scala:538)
        at com.nvidia.spark.rapids.SparkPlanMeta.$anonfun$fixUpJoinConsistencyIfNeeded$1$adapted(RapidsMeta.scala:538)

I think I tracked down the problem. In the fixUpExchangeOverhead we look to make sure everything can be replaced, but there is one weird corner case where we are removing the Sort when we replace sortmergejoin with a shuffle hash join. This ends up showing:

20/12/19 00:40:02 WARN GpuShuffleMeta: parent can be replaced is not empty Some(*Exec could run on GPU but is going to be removed because removing SortExec as part replacing sortMergeJoin with shuffleHashJoin

This causes us to not mark the gpuColumnarExchange as will not work even though the child wouldn't run on the GPU in the initial planning and thus is not tagged. With AQE on, that subquery ends up being re-evaluated and at that point we do properly change the GpuColumnarExchange as a CPU one, but since it wasn't tagged the otherwise of the join subquery doesn't know it and runs on the GPU, then we have mismatched and get the error that one is cpu and one is gpu.

In this case this is the parent node not the child, the converted plan would look like:

  :     +- GpuShuffledHashJoin [quarter#317, loan_id#286L, cast(timestamp_year#1162 as bigint), cast(timestamp_month#1126 as bigint)], [quarter#1229, loan_id#1198L, timestamp_year#1052L, timestamp_month#1081L], LeftOuter, BuildRight, false
   :        :- GpuColumnarExchange gpuhashpartitioning(quarter#317, loan_id#286L, cast(timestamp_year#1162 as bigint), cast(timestamp_month#1126 as bigint), 192), true, [id=#221]
   :        :  +- Project [loan_id#286L, cast(cast(unix_timestamp(monthly_reporting_period#287, MM/dd/yyyy, Some(UTC)) as timestamp) as date) AS monthly_reporting_period#402, servicer#288, interest_rate#289, current_actual_upb#290, loan_age#291, remaining_months_to_legal_maturity#292, adj_remaining_months_to_maturity#293, cast(cast(unix_timestamp(maturity_date#294, MM/yyyy, Some(UTC)) as timestamp) as date) AS maturity_date#648, msa#295, current_loan_delinquency_status#296, mod_flag#297, zero_balance_code#298, cast(cast(unix_timestamp(zero_balance_effective_date#299, MM/yyyy, Some(UTC)) as timestamp) as date) AS zero_balance_effective_date#684, cast(cast(unix_timestamp(last_paid_installment_date#300, MM/dd/yyyy, Some(UTC)) as timestamp) as date) AS last_paid_installment_date#540, cast(cast(unix_timestamp(foreclosed_after#301, MM/dd/yyyy, Some(UTC)) as timestamp) as date) AS foreclosed_after#576, cast(cast(unix_timestamp(disposition_date#302, MM/dd/yyyy, Some(UTC)) as timestamp) as date) AS disposition_date#612, foreclosure_costs#303, prop_preservation_and_repair_costs#304, asset_recovery_costs#305, misc_holding_expenses#306, holding_taxes#307, net_sale_proceeds#308, credit_enhancement_proceeds#309, ... 11 more fields]
   :        :     +- GpuFilter (gpuisnotnull(loan_id#286L) AND gpuisnotnull(quarter#317))
   :        :        +- GpuBatchScan[loan_id#286L, monthly_reporting_period#287, servicer#288, interest_

*Exec could run on GPU
*Exec could run on GPU but is going to be removed because removing SortExec as part replacing sortMergeJoin with shuffleHashJoin

So I think we just need to add a check in to handle this special case.

I did try a quick hack change to see if it passed and it got by that error but failed later with:
Caused by: scala.MatchError: BroadcastQueryStage 6

So not sure if there are further things broken as well.

@tgravescs
Copy link
Collaborator

tgravescs commented Jan 4, 2021

correction to the above comment, the fixUpExchangeOverhead checks to see if both:

(parent.filter(_.canThisBeReplaced).isEmpty &&
        childPlans.filter(_.canThisBeReplaced).isEmpty))

In this case the child is but not the parent so it doesn't mark it as will not work. But then when AQE kicks it to look only at the subquery, I believe it only sees the child and it gets marked as won't work, the other side does go on the GPU and then we end up with the mismatch.

@tgravescs
Copy link
Collaborator

So just to clarify there are 2 issues here -

  1. is EMR specific where we need a shim for the GpuBroadcastExchangeExec because BroadcastExchangeLike is different in emr
  2. Once you fix 1 on EMR you end up hitting the other issue, which is generic just to AQE where our fixUpExchangeOverhead function requires both parent and child to not be on GPU before removing the shuffle. With AQE the way it does its subqueries, after the initial pass it only looks at things after the exchange. so in our initial pass the parent is on GPU so we don't mark that exchange as will not work on GPU, then later when AQE runs the subquery there is no parent and child isn't on GPU so it says it won't work and puts it back on CPU. Then we end up with an exchange on one side of join being on GPU and one side on CPU

@tgravescs
Copy link
Collaborator

Note that with the AQE fix, the issue 1 doesn't show up on the mortgage runs, I believe that will only show up on failures, at least in this scenario.

@tgravescs
Copy link
Collaborator

For the second issue with incompatibilty with EMR we are going to assume that won't be an issue with the official EMR released jar because they build against their own jars. So for now the fix for AQE issue will resolve this

@tgravescs
Copy link
Collaborator

#1439

tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
…IDIA#1423)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

No branches or pull requests

3 participants