-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Verify Parquet columnar encryption is handled safely #5246
Comments
I'd like to have a try! |
Verified on Spark3.3. It seems like our plugin could decrypt correctly. scala> val df = Seq((1, 2, 3)).toDF("one", "tWo", "THREE")
df: org.apache.spark.sql.DataFrame = [one: int, tWo: int ... 1 more field]
scala> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
| "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
scala> sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
| "keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==")
scala> sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
| "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
scala> df.write.
| option("parquet.encryption.column.keys" , "keyA:one").
| option("parquet.encryption.footer.key" , "keyB").
| parquet("./tmp/parquet")
22/05/07 20:08:10 WARN GpuOverrides:
*Exec <DataWritingCommandExec> will run on GPU
*Output <InsertIntoHadoopFsRelationCommand> will run on GPU
! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
@Expression <AttributeReference> one#10 could run on GPU
@Expression <AttributeReference> tWo#11 could run on GPU
@Expression <AttributeReference> THREE#12 could run on GPU
scala> spark.read.parquet("./tmp/parquet").show
22/05/07 20:08:27 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(one#19 as string) AS one#28 will run on GPU
*Expression <Cast> cast(one#19 as string) will run on GPU
*Expression <Alias> cast(tWo#20 as string) AS tWo#29 will run on GPU
*Expression <Cast> cast(tWo#20 as string) will run on GPU
*Expression <Alias> cast(THREE#21 as string) AS THREE#30 will run on GPU
*Expression <Cast> cast(THREE#21 as string) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
+---+---+-----+
|one|tWo|THREE|
+---+---+-----+
| 1| 2| 3|
+---+---+-----+ |
I do not think this has been validated. It looks like this passed because the GPU was allowed to perform the Parquet write despite encryption being requested. The Parquet file was then written without encryption which explains why the read passed. To test this properly, the file must be written by the CPU (i.e.: either run without the RAPIDS Accelerator or disable it before performing the write). I'll file an issue to document that we're not properly preventing Parquet writes from being replaced when encryption is requested. |
Thank you for your review @jlowe. I also find there are some mistakes in my verification. scala> spark.conf.set("spark.rapids.sql.format.parquet.read.enabled", "true")
scala> spark.conf.set("spark.rapids.sql.format.parquet.enabled", "true")
scala> spark.read.parquet("./tmp/parquet").show
22/05/11 09:23:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.scheduler.Task.run(Task.scala:136) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814) ~[spark-catalyst_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?]
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_292]
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588) ~[parquet-hadoop-1.12.2.jar:1.12.2]
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776) ~[parquet-hadoop-1.12.2.jar:1.12.2]
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657) ~[parquet-hadoop-1.12.2.jar:1.12.2]
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484) ~[spark-sql_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?]
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_292]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_292]
22/05/11 09:23:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (remzi-desktop executor driver): org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517)
at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484)
... 13 more
22/05/11 09:23:30 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (remzi-desktop executor driver): org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517)
at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484)
... 13 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:70)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:527)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:125)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:168)
at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:209)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:209)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:553)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:538)
... 47 elided
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:477)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:523)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:517)
at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:76)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/remziy/working/rapids/tmp/parquet/part-00000-2aaef34d-78ac-49ab-b8b7-7afda73b5669-c000.snappy.parquet; isDirectory=false; length=2575; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFooterForFileError(QueryExecutionErrors.scala:814)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:490)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.parquet.crypto.ParquetCryptoRuntimeException: Trying to read file with encrypted footer. No keys available
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:588)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:776)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:53)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:44)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:484)
... 13 more |
And just as you said, for spark.conf.set("spark.rapids.sql.format.parquet.write.enabled", "true")
scala> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
| "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
scala> sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
| "keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==")
scala> sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
| "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
scala> df.write.
| option("parquet.encryption.column.keys" , "keyA:one").
| option("parquet.encryption.footer.key" , "keyB").
| parquet("./tmp/parquet")
22/05/11 09:30:02 WARN GpuOverrides:
*Exec <DataWritingCommandExec> will run on GPU
*Output <InsertIntoHadoopFsRelationCommand> will run on GPU
! <LocalTableScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.LocalTableScanExec
@Expression <AttributeReference> one#10 could run on GPU
@Expression <AttributeReference> tWo#11 could run on GPU
@Expression <AttributeReference> THREE#12 could run on GPU
scala> spark.read.parquet("./tmp/parquet").show
22/05/11 09:30:18 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(one#19 as string) AS one#28 will run on GPU
*Expression <Cast> cast(one#19 as string) will run on GPU
*Expression <Alias> cast(tWo#20 as string) AS tWo#29 will run on GPU
*Expression <Cast> cast(tWo#20 as string) will run on GPU
*Expression <Alias> cast(THREE#21 as string) AS THREE#30 will run on GPU
*Expression <Cast> cast(THREE#21 as string) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
+---+---+-----+
|one|tWo|THREE|
+---+---+-----+
| 1| 2| 3|
+---+---+-----+ |
@jlowe |
First step is to verify that all the different ways Parquet could be encrypted are detected with a thrown exception if we try to read on the GPU to avoid the possibility of silent data corruption. I believe this is already covered, but we should verify there are no other ways the file could be encrypted beyond what is already tested.
I don't see a need for another config. If the user doesn't want Parquet to be read by the GPU, there's already the existing config, It might make sense to automatically fallback if certain Parquet encryption settings are present (e.g.: when the user provides a secret key via a config), but I could see cases where only a few Parquet files (or maybe none!) require the secret key, and we would end up falling back from all Parquet reads. But maybe this is the best default setup for now until we have improved support for encrypted Parquet files.
Agree it would be good to refine any exception messages being thrown to inform the user how to avoid this via config settings. |
@jlowe @GaryShen2008 scala> sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
| "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
scala> sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
| "keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==")
scala> sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
| "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
scala> spark.conf.set("spark.rapids.sql.format.parquet.read.enabled", "true")
scala> spark.conf.set("spark.rapids.sql.format.parquet.enabled", "true")
scala> spark.read.
| option("parquet.encryption.column.keys" , "keyA:one").
| option("parquet.encryption.footer.key" , "keyB").
| parquet("./tmp/parquet/encrypted")
res5: org.apache.spark.sql.DataFrame = [one: int, tWo: int ... 1 more field]
scala> val df = spark.read.
| option("parquet.encryption.column.keys" , "keyA:one").
| option("parquet.encryption.footer.key" , "keyB").
| parquet("./tmp/parquet/encrypted")
df: org.apache.spark.sql.DataFrame = [one: int, tWo: int ... 1 more field]
scala> df.show
22/05/24 13:37:52 WARN GpuOverrides:
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
@Partitioning <SinglePartition$> could run on GPU
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(one#6 as string) AS one#19 will run on GPU
*Expression <Cast> cast(one#6 as string) will run on GPU
*Expression <Alias> cast(tWo#7 as string) AS tWo#20 will run on GPU
*Expression <Cast> cast(tWo#7 as string) will run on GPU
*Expression <Alias> cast(THREE#8 as string) AS THREE#21 will run on GPU
*Expression <Cast> cast(THREE#8 as string) will run on GPU
*Exec <FileSourceScanExec> will run on GPU
+---+---+-----+
|one|tWo|THREE|
+---+---+-----+
| 0| 2| 3|
+---+---+-----+ |
Since the footer is read on the CPU by the same Parquet code, I'm not surprised that the column names are retrieved properly. However I do not see any way we can reliably produce the proper output if the columns are encrypted. I suspect your finding is an anomaly caused by the tiny amount of data and only encrypting one of the columns. For example, here's what I get for the same example, which is not quite correct:
This is even more apparent when using a file with more data. For example, I created an encrypted version of
|
Yes. GPU can run successfully but give wrong answer, so that it might be a little difficult to fall back to CPU because no runtime error is thrown. |
The footer is decryptable because we read it on the CPU. If we had dynamic, late fallback to a CPU reader, we should be able to detect from the footer information that there's an encrypted column and fallback in that case (either just for the encrypted columns being read or for the entire read of that file). |
If the footer is encrypted then it will have a magic number of PARE instead of PAR1. We might be able to detect that early on and skip those files. For individual columns I think what you want to look for is Sadly there appears to be no good way to get access to this from the parsed parquet-mr API. Especially because we have to interact with several different versions of it. I think the right thing to do is to remove all of the encryption configs from the Hadoop conf that is passed to our Parquet reader and then catch any exceptions that it throws. We can then wrap them in more informative exceptions explaining what is happening. |
so I think we do want the check for encryption in 22.06. Bobby recommended: So that we don't read bogus data and the user is thrown a useful error message. |
PR is merged |
Spark 3.2 added support for columnar encryption in Parquet. We probably cannot detect this at query planning time, but minimally we need to make sure we are not corrupting the data during reads on encrypted Parquet files and instead throwing an exception. Long-term we either need to directly support encrypted data or dynamic, post-planning fallback to the CPU on an encrypted Parquet read.
The text was updated successfully, but these errors were encountered: