Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Exception calling collect() when partitioning using with arrays with null values using array_union(...) #5957

Closed
NVnavkumar opened this issue Jul 6, 2022 · 1 comment
Assignees
Labels
bug Something isn't working P1 Nice to have for release

Comments

@NVnavkumar
Copy link
Collaborator

Describe the bug
This is a corner case specific to a couple of things: 1) using array_union(a, b) where a is either an empty or a very small array compared to b and 2) b contains null values. Note that if the parameters are reversed, this exception does not occur.

Steps/Code to reproduce bug

  1. Generate a Parquet file with 1 field, a, that is of type ArrayType(<elementType>, true). The elementType can be any simple type like boolean, integer, long, string, etc. Even a single row will work. You can use the attached file for your convenience.

array_test_data.zip

  1. Using spark-shell with the plugin loaded:
scala>spark.conf.set("spark.rapids.sql.exec.CollectLimitExec", "true")

scala>val df = spark.read.parquet("/tmp/array_test_data.parquet")

scala>df.selectExpr("array_union(array(), a)").collect()

22/07/06 15:46:35 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 43)
java.lang.AssertionError:  value at 1 is null
	at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
	at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
	at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
	at org.apache.spark.sql.vectorized.ColumnarArray.getUTF8String(ColumnarArray.java:148)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
22/07/06 15:46:35 WARN TaskSetManager: Lost task 0.0 in stage 25.0 (TID 43) (192.168.1.58 executor driver): java.lang.AssertionError:  value at 1 is null
	at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
	at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
	at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
	at org.apache.spark.sql.vectorized.ColumnarArray.getUTF8String(ColumnarArray.java:148)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

22/07/06 15:46:35 ERROR TaskSetManager: Task 0 in stage 25.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 43) (192.168.1.58 executor driver): java.lang.AssertionError:  value at 1 is null
	at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
	at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
	at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
	at org.apache.spark.sql.vectorized.ColumnarArray.getUTF8String(ColumnarArray.java:148)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
  ... 47 elided
Caused by: java.lang.AssertionError:  value at 1 is null
  at ai.rapids.cudf.HostColumnVectorCore.assertsForGet(HostColumnVectorCore.java:230)
  at ai.rapids.cudf.HostColumnVectorCore.getUTF8(HostColumnVectorCore.java:364)
  at com.nvidia.spark.rapids.RapidsHostColumnVectorCore.getUTF8String(RapidsHostColumnVectorCore.java:183)
  at org.apache.spark.sql.vectorized.ColumnarArray.getUTF8String(ColumnarArray.java:148)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:350)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)

Note this only happens right now when calling collect(). If the output is written to parquet or some other format, the same exception is not thrown.

Environment details (please complete the following information)

  • Environment location: Standalone
  • Configuration parameters
    • spark.sql.rapids.CollectLimitExec=true
@NVnavkumar NVnavkumar added bug Something isn't working ? - Needs Triage Need team to review and classify labels Jul 6, 2022
@sameerz sameerz added P1 Nice to have for release and removed ? - Needs Triage Need team to review and classify labels Jul 12, 2022
@NVnavkumar
Copy link
Collaborator Author

This issue has been resolved in the upstream PR already, so will close this one out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P1 Nice to have for release
Projects
None yet
Development

No branches or pull requests

2 participants