You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
After that, creating a demo table and run a query:
spark.sql("""CREATE TABLE spark_catalog.default.demo (id bigint, data string) USING iceberg;""")
spark.sql("""INSERT INTO spark_catalog.default.demo VALUES (1, 'a'), (2, 'b'), (3, 'c');""")
spark.sql("""select count(distinct id) from spark_catalog.default.demo""").show
Error:
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeRowToColumnarBatchIterator.fillBatch(Unknown Source)
at com.nvidia.spark.rapids.UnsafeRowToColumnarBatchIterator.next(UnsafeRowToColumnarBatchIterator.java:122)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeRowToColumnarBatchIterator.next(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
at scala.Option.getOrElse(Option.scala:189)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
at scala.Option.getOrElse(Option.scala:189)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
at scala.Option.getOrElse(Option.scala:189)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.aggregateInputBatches(aggregate.scala:283)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:238)
at scala.Option.getOrElse(Option.scala:189)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
at com.nvidia.spark.rapids.ColumnarToRowIterator.$anonfun$fetchNextBatch$2(GpuColumnarToRowExec.scala:242)
at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
at com.nvidia.spark.rapids.ColumnarToRowIterator.withResource(GpuColumnarToRowExec.scala:188)
at com.nvidia.spark.rapids.ColumnarToRowIterator.fetchNextBatch(GpuColumnarToRowExec.scala:239)
at com.nvidia.spark.rapids.ColumnarToRowIterator.loadNextBatch(GpuColumnarToRowExec.scala:216)
at com.nvidia.spark.rapids.ColumnarToRowIterator.hasNext(GpuColumnarToRowExec.scala:256)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
We made the assumption that the data we would get would always be an UnsafeRow, even though the type passed in is InternalRow. Spark makes that assumption too in many cases. Like with collect(). Can you read data from a very small table in iceberg and just do a collect on it? From reading the Spark code you are going to get the exact same error, just at a different location in the code.
We can fix it, but the simplest fix is going to slow down processing in general, so we are going to have to think of a way to dynamically decide if we need to add in the Unsafe Projection or not.
Currently our plugin does not support iceberg so we want the iceberg reading can fallback to CPU gracefully instead of fail.
Env:
Spark 3.1.1 standalone cluster with Hive integration
22.04 snapshot jars
Below is a simple reading test and it will fail with our plugin:
After that, creating a demo table and run a query:
Error:
CPU Spark:
The text was updated successfully, but these errors were encountered: