Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5775] BugFix: GenericRow cannot be cast to SpecificMutableRow when nested data and partitioned table #4697

Closed
wants to merge 11 commits into from

Conversation

anselmevignon
Copy link

The Bug solved here was due to a change in PartitionTableScan, when reading a partitioned table.

  • When the Partititon column is requested out of a parquet table, the Table Scan needs to add the column back to the output Rows.
  • To update the Row object created by PartitionTableScan, the Row was first casted in SpecificMutableRow, before being updated.
  • This casting was unsafe, since there are no guarantee that the newHadoopRDD used internally will instanciate the output Rows as MutableRow.

Particularly, when reading a Table with complex (e.g. struct or Array) types, the newHadoopRDD uses a parquet.io.api.RecordMateralizer, that is produced by the org.apache.spark.sql.parquet.RowReadSupport . This consumer will be created as a org.apache.spark.sql.parquet.CatalystGroupConverter (a) and not a org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter (b), when there are complex types involved (in the org.apache.spark.sql.parquet.CatalystConverter.createRootConverter factory )

The consumer (a) will output GenericRow, while the consumer (b) produces SpecificMutableRow.

Therefore any request selecting a partition columns, plus a complex type column, are returned as GenericRows, and fails into an unsafe casting pit (see https://issues.apache.org/jira/browse/SPARK-5775 for an example. )

The fix proposed here originally replaced the unsafe class casting by a case matching on the Row type, updating the Row if it is of a mutable type, and recreating a Row otherwise.

This PR now implements the solution updated by @liancheng on aa39460 :

The fix checks if every requested requested columns are primitiveType, in a manner symmetrical to the check in org.apache.spark.sql.parquet.CatalystConverter.createRootConverter.

  • If all columns are primitive type, the Row can safely be casted to a MutableRow.
  • Otherwise a new GenericRow is created, and the partition column is written this new row structure

This fix is unit-tested in sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

@marmbrus
Copy link
Contributor

/cc @liancheng

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Feb 19, 2015

Test build #27734 timed out for PR 4697 at commit 22cec52 after a configured wait of 120m.

@ayoub-benali
Copy link

Just tried to reproduce the example in SPARK-5775 with the spark shell and now it hangs for ever during query time.
Maybe because the tests don't reproduce the same example as in the issue: array of struct.

scala> hiveContext.sql("select data.field1 from test_table LATERAL VIEW explode(data_array) nestedStuff AS data").collect
15/02/20 16:32:55 INFO ParseDriver: Parsing command: select data.field1 from test_table LATERAL VIEW explode(data_array) nestedStuff AS data
15/02/20 16:32:55 INFO ParseDriver: Parse Completed
15/02/20 16:32:55 INFO MemoryStore: ensureFreeSpace(260309) called with curMem=97368, maxMem=280248975
15/02/20 16:32:55 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 254.2 KB, free 266.9 MB)
15/02/20 16:32:55 INFO MemoryStore: ensureFreeSpace(28517) called with curMem=357677, maxMem=280248975
15/02/20 16:32:55 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 27.8 KB, free 266.9 MB)
15/02/20 16:32:55 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ****:54658 (size: 27.8 KB, free: 267.2 MB)
15/02/20 16:32:55 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/02/20 16:32:55 INFO SparkContext: Created broadcast 2 from NewHadoopRDD at ParquetTableOperations.scala:119
15/02/20 16:32:55 INFO FileInputFormat: Total input paths to process : 3
15/02/20 16:32:55 INFO ParquetInputFormat: Total input paths to process : 3
15/02/20 16:32:55 INFO ParquetFileReader: Initiating action with parallelism: 5
15/02/20 16:32:55 INFO ParquetFileReader: reading summary file: hdfs://****:8020/path/test_table/date=2015-02-12/_metadata
15/02/20 16:32:55 INFO ParquetFileReader: reading another 1 footers
15/02/20 16:32:55 INFO ParquetFileReader: Initiating action with parallelism: 5
15/02/20 16:32:55 INFO FilteringParquetRowInputFormat: Fetched [LocatedFileStatus{path=hdfs://****:8020/path/test_table/date=2015-02-12/part-r-1.parquet; isDirectory=false; length=463; replication=3; blocksize=134217728; modification_time=1424446345899; access_time=1424446344501; owner=rptn_deploy; group=supergroup; permission=rw-r--r--; isSymlink=false}, LocatedFileStatus{path=hdfs://****:8020/path/test_table/date=2015-02-12/part-r-2.parquet; isDirectory=false; length=731; replication=3; blocksize=134217728; modification_time=1424446346655; access_time=1424446345540; owner=rptn_deploy; group=supergroup; permission=rw-r--r--; isSymlink=false}, LocatedFileStatus{path=hdfs://****:8020/path/test_table/date=2015-02-12/part-r-3.parquet; isDirectory=false; length=727; replication=3; blocksize=134217728; modification_time=1424446346773; access_time=1424446345628; owner=rptn_deploy; group=supergroup; permission=rw-r--r--; isSymlink=false}] footers in 31 ms
15/02/20 16:32:55 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
15/02/20 16:32:55 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
15/02/20 16:32:55 INFO FilteringParquetRowInputFormat: Using Task Side Metadata Split Strategy
15/02/20 16:32:55 INFO SparkContext: Starting job: collect at SparkPlan.scala:84
15/02/20 16:32:55 INFO DAGScheduler: Got job 2 (collect at SparkPlan.scala:84) with 3 output partitions (allowLocal=false)
15/02/20 16:32:55 INFO DAGScheduler: Final stage: Stage 2(collect at SparkPlan.scala:84)
15/02/20 16:32:55 INFO DAGScheduler: Parents of final stage: List()
15/02/20 16:32:55 INFO DAGScheduler: Missing parents: List()
15/02/20 16:32:55 INFO DAGScheduler: Submitting Stage 2 (MappedRDD[26] at map at SparkPlan.scala:84), which has no missing parents
15/02/20 16:32:56 INFO MemoryStore: ensureFreeSpace(7616) called with curMem=386194, maxMem=280248975
15/02/20 16:32:56 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 7.4 KB, free 266.9 MB)
15/02/20 16:32:56 INFO MemoryStore: ensureFreeSpace(4225) called with curMem=393810, maxMem=280248975
15/02/20 16:32:56 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.1 KB, free 266.9 MB)
15/02/20 16:32:56 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ****:54658 (size: 4.1 KB, free: 267.2 MB)
15/02/20 16:32:56 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
15/02/20 16:32:56 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:838
15/02/20 16:32:56 INFO DAGScheduler: Submitting 3 missing tasks from Stage 2 (MappedRDD[26] at map at SparkPlan.scala:84)
15/02/20 16:32:56 INFO TaskSchedulerImpl: Adding task set 2.0 with 3 tasks
15/02/20 16:32:56 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 6, ****, NODE_LOCAL, 1639 bytes)
15/02/20 16:32:56 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 7, ****, NODE_LOCAL, 1638 bytes)
15/02/20 16:32:56 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 8, ****, NODE_LOCAL, 1639 bytes)
15/02/20 16:32:56 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ****:45208 (size: 4.1 KB, free: 133.6 MB)
15/02/20 16:32:56 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ****:52420 (size: 4.1 KB, free: 133.6 MB)
15/02/20 16:32:56 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ****:43309 (size: 4.1 KB, free: 133.6 MB)
15/02/20 16:32:56 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ****:43309 (size: 27.8 KB, free: 133.6 MB)
15/02/20 16:32:56 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ****:52420 (size: 27.8 KB, free: 133.6 MB)
15/02/20 16:32:56 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on ****:45208 (size: 27.8 KB, free: 133.6 MB)
15/02/20 16:32:56 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 8) in 490 ms on **** (1/3)
15/02/20 16:36:01 INFO BlockManager: Removing broadcast 1
15/02/20 16:36:01 INFO BlockManager: Removing block broadcast_1_piece0
15/02/20 16:36:01 INFO MemoryStore: Block broadcast_1_piece0 of size 31176 dropped from memory (free 279882116)
15/02/20 16:36:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on ****:54658 in memory (size: 30.4 KB, free: 267.2 MB)
15/02/20 16:36:01 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/02/20 16:36:01 INFO BlockManager: Removing block broadcast_1
15/02/20 16:36:01 INFO MemoryStore: Block broadcast_1 of size 66192 dropped from memory (free 279948308)
15/02/20 16:36:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on ****:52420 in memory (size: 30.4 KB, free: 133.6 MB)
15/02/20 16:36:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on ****:45208 in memory (size: 30.4 KB, free: 133.6 MB)
15/02/20 16:36:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on ****:43309 in memory (size: 30.4 KB, free: 133.6 MB)
15/02/20 16:36:01 INFO ContextCleaner: Cleaned broadcast 1

@anselmevignon
Copy link
Author

Hi Ayioub,

When did you pulled ? unfortunately I pulled a stupid version on my first PR... basically there was an infinite loop in the "fix"

I updated the PR with the correct fix, but only 4hrs ago (commit 8fc6a8c). Would you mind checking which version you are using ?

I actually had a similar, but not the same issue, so I unittested on my own problem. Please tell me if the final fix did not solved yours. there are indeed no test on array of struct, at least not in the hive unittest deck (only on catalyst)

@SparkQA
Copy link

SparkQA commented Feb 20, 2015

Test build #27777 has finished for PR 4697 at commit 8fc6a8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ayoub-benali
Copy link

@anselmevignon I just re-tested and it worked. Thanks 👍

Little off topic, I checked also if this pull request would solve SPARK-5508 but it didn't work. It seems that other issue is linked to the writing part (INSERT) and not the reading (select).

@anselmevignon
Copy link
Author

@ayoub-benali thanks for the review !

about SPARK-5508, the stacktrace really looks like the same as a similar one I saw with a spark.sql.hive.convertMetastoreParquet badly set. Could that be related ? Would it be possible that the insert is writing in an "old format" parquet ? (never read this part of the code, so no idea on the specifics, sorry...)

@liancheng
Copy link
Contributor

Hey @anselmevignon, because 1.3 release is really close, I just made #4697 based on your work, but target to master and branch-1.3. We can still polish this PR (mainly minor styling issues) and merge it into branch-1.2. Thanks for working on this!

}
}
}
}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For indentation and whitespace related styling, please refer to equivalent changes in #4792.

asfgit pushed a commit that referenced this pull request Feb 28, 2015
…leRow when nested data and partitioned table

This PR adapts anselmevignon's #4697 to master and branch-1.3. Please refer to PR description of #4697 for details.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4792)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>
Author: Cheng Lian <liancheng@users.noreply.github.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #4792 from liancheng/spark-5775 and squashes the following commits:

538f506 [Cheng Lian] Addresses comments
cee55cf [Cheng Lian] Merge pull request #4 from yhuai/spark-5775-yin
b0b74fb [Yin Huai] Remove runtime pattern matching.
ca6e038 [Cheng Lian] Fixes SPARK-5775
asfgit pushed a commit that referenced this pull request Feb 28, 2015
…leRow when nested data and partitioned table

This PR adapts anselmevignon's #4697 to master and branch-1.3. Please refer to PR description of #4697 for details.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4792)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>
Author: Cheng Lian <liancheng@users.noreply.github.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #4792 from liancheng/spark-5775 and squashes the following commits:

538f506 [Cheng Lian] Addresses comments
cee55cf [Cheng Lian] Merge pull request #4 from yhuai/spark-5775-yin
b0b74fb [Yin Huai] Remove runtime pattern matching.
ca6e038 [Cheng Lian] Fixes SPARK-5775

(cherry picked from commit e6003f0)
Signed-off-by: Cheng Lian <lian@databricks.com>
@ayoub-benali
Copy link

Hi @anselmevignon, would you mind fixing the styling issues so that PR get merged in 1.2 branch ?
if you don't plan to work on it any more, could you allow me to commit to your branch so that I can update this PR ? :)

@yhuai
Copy link
Contributor

yhuai commented Mar 1, 2015

Since it has been fixed in master and branch-1.3, it will be great if we can have the same changes with aa39460 for branch-1.2.

@anselmevignon
Copy link
Author

Hi @ayoub-benali,

Sorry for the delay, I was OOO during the end of the week.

I will be correcting the style and getting rid of the dynamic type checking (following the changes @yhuai and @liancheng made on the 1.3 branch / master)

This being said, I am also enabling you to write on the branch; if there is anything else feel free to solve it either way you prefer (comment and let me update, or write the change on your own.)

cheers,

Anselme

@SparkQA
Copy link

SparkQA commented Mar 2, 2015

Test build #28176 has finished for PR 4697 at commit 52f73fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

Looks like there is only two small style comments before this can be merged. Thanks for working on it! Would you mind also updating the description. I believe it still describes the original solution and not the newest version that has been backported.

@anselmevignon
Copy link
Author

Should be better now. Would you mind commenting here if there are updates needed, I seem to have trouble receiving notifications from the inline comments.
Thanks for the review.

@SparkQA
Copy link

SparkQA commented Mar 23, 2015

Test build #28985 has finished for PR 4697 at commit 6a4c53d.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 23, 2015

Test build #28994 has finished for PR 4697 at commit 6a4c53d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Mar 23, 2015
…when nested data and partitioned table

The Bug solved here was due to a change in PartitionTableScan, when reading a partitioned table.

- When the Partititon column is requested out of a parquet table, the Table Scan needs to add the column back to the output Rows.
- To update the Row object created by PartitionTableScan, the Row was first casted in SpecificMutableRow, before being updated.
- This casting was unsafe, since there are no guarantee that the newHadoopRDD used internally will instanciate the output Rows as MutableRow.

Particularly, when reading a Table with complex (e.g. struct or Array) types,  the newHadoopRDD  uses a parquet.io.api.RecordMateralizer, that is produced by the org.apache.spark.sql.parquet.RowReadSupport . This consumer will be created as a org.apache.spark.sql.parquet.CatalystGroupConverter (a) and not a org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter (b), when there are complex types involved (in the org.apache.spark.sql.parquet.CatalystConverter.createRootConverter factory  )

The consumer (a) will output GenericRow, while the consumer (b) produces SpecificMutableRow.

Therefore any request selecting a partition columns, plus a complex type column, are returned as GenericRows, and fails into an unsafe casting pit (see https://issues.apache.org/jira/browse/SPARK-5775 for an example. )

The fix proposed here originally replaced the unsafe class casting by a case matching on the Row type, updating the Row if it is of a mutable type, and recreating a Row otherwise.

This PR now implements the solution updated by liancheng on aa39460 :

The fix checks if every requested requested columns are primitiveType, in a manner symmetrical to the check in org.apache.spark.sql.parquet.CatalystConverter.createRootConverter.
 - If all columns are primitive type,  the Row can safely be casted to a MutableRow.
 - Otherwise a new GenericRow is created, and the partition column is written this new row structure

This fix is unit-tested in  sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Author: Anselme Vignon <anselme.vignon@flaminem.com>
Author: Cheng Lian <lian@databricks.com>

Closes #4697 from anselmevignon/local_dev and squashes the following commits:

6a4c53d [Anselme Vignon] style corrections
52f73fc [Cheng Lian] cherry-pick & merge from aa39460
8fc6a8c [Anselme Vignon] correcting tests on temporary tables
24928ea [Anselme Vignon] corrected mirror bug (see SPARK-5775) for newParquet
7c829cb [Anselme Vignon] bugfix, hopefully correct this time
005a7f8 [Anselme Vignon] added test cleanup
22cec52 [Anselme Vignon] lint compatible changes
ae48f7c [Anselme Vignon] unittesting SPARK-5775
f876dea [Anselme Vignon] starting to write tests
dbceaa3 [Anselme Vignon] cutting lines
4eb04e9 [Anselme Vignon] bugfix SPARK-5775
@marmbrus
Copy link
Contributor

Thanks! Merged to branch-1.2

@marmbrus
Copy link
Contributor

Mind closing this now? PRs to branches other than master do not auto close.

@anselmevignon
Copy link
Author

I closing it, thanks a lot everyone for the merge, the review, and the patience with newbyism :)

Anselme

@anirudhcelebal
Copy link

root
|-- adultbasefare: long (nullable = true)
|-- adultcommission: long (nullable = true)
|-- adultservicetax: long (nullable = true)
|-- adultsurcharge: long (nullable = true)
|-- airline: string (nullable = true)
|-- arrdate: string (nullable = true)
|-- arrtime: string (nullable = true)
|-- cafecommission: long (nullable = true)
|-- carrierid: string (nullable = true)
|-- class: string (nullable = true)
|-- depdate: string (nullable = true)
|-- deptime: string (nullable = true)
|-- destination: string (nullable = true)
|-- discount: long (nullable = true)
|-- duration: string (nullable = true)
|-- fare: struct (nullable = true)
| |-- A: long (nullable = true)
| |-- C: long (nullable = true)
| |-- I: long (nullable = true)
| |-- adultairlinetxncharge: long (nullable = true)
| |-- adultairporttax: long (nullable = true)
| |-- adultbasefare: long (nullable = true)
| |-- adultcommission: double (nullable = true)
| |-- adultsurcharge: long (nullable = true)
| |-- adulttotalfare: long (nullable = true)
| |-- childairlinetxncharge: long (nullable = true)
| |-- childairporttax: long (nullable = true)
| |-- childbasefare: long (nullable = true)
| |-- childcommission: double (nullable = true)
| |-- childsurcharge: long (nullable = true)
| |-- childtotalfare: long (nullable = true)
| |-- discount: long (nullable = true)
| |-- infantairlinetxncharge: long (nullable = true)
| |-- infantairporttax: long (nullable = true)
| |-- infantbasefare: long (nullable = true)
| |-- infantcommission: long (nullable = true)
| |-- infantsurcharge: long (nullable = true)
| |-- infanttotalfare: long (nullable = true)
| |-- servicetax: long (nullable = true)
| |-- totalbasefare: long (nullable = true)
| |-- totalcommission: double (nullable = true)
| |-- totalfare: long (nullable = true)
| |-- totalsurcharge: long (nullable = true)
| |-- transactionfee: long (nullable = true)
|-- farebasis: string (nullable = true)
|-- farerule: string (nullable = true)
|-- flightcode: string (nullable = true)
|-- flightno: string (nullable = true)
|-- k: string (nullable = true)
|-- onwardflights: array (nullable = true)
| |-- element: string (containsNull = true)
|-- origin: string (nullable = true)
|-- promocode: string (nullable = true)
|-- promodiscount: long (nullable = true)
|-- promotionText: string (nullable = true)
|-- stops: string (nullable = true)
|-- tickettype: string (nullable = true)
|-- totalbasefare: long (nullable = true)
|-- totalcommission: long (nullable = true)
|-- totalfare: long (nullable = true)
|-- totalpriceamount: long (nullable = true)
|-- totalsurcharge: long (nullable = true)
|-- transactionfee: long (nullable = true)
|-- viacharges: long (nullable = true)
|-- warnings: string (nullable = true)

Now i want to flatten it so that the fare field will be removed and everything will be flatten

For this i used explode. But i am getting an error:

org.apache.spark.sql.AnalysisException: cannot resolve 'explode(fare)' due to data type mismatch: input to function explode should be array or map type, not StructType(StructField(A,LongType,true), StructField(C,LongType,true), StructField(I,LongType,true), StructField(adultairlinetxncharge,LongType,true), StructField(adultairporttax,LongType,true), StructField(adultbasefare,LongType,true), StructField(adultcommission,DoubleType,true), StructField(adultsurcharge,LongType,true), StructField(adulttotalfare,LongType,true), StructField(childairlinetxncharge,LongType,true), StructField(childairporttax,LongType,true), StructField(childbasefare,LongType,true), StructField(childcommission,DoubleType,true), StructField(childsurcharge,LongType,true), StructField(childtotalfare,LongType,true), StructField(discount,LongType,true), StructField(infantairlinetxncharge,LongType,true), StructField(infantairporttax,LongType,true), StructField(infantbasefare,LongType,true), StructField(infantcommission,LongType,true), StructField(infantsurcharge,LongType,true), StructField(infanttotalfare,LongType,true), StructField(servicetax,LongType,true), StructField(totalbasefare,LongType,true), StructField(totalcommission,DoubleType,true), StructField(totalfare,LongType,true), StructField(totalsurcharge,LongType,true), StructField(transactionfee,LongType,true));

If not explode how can i flatten it.Your help will be appreciated. Thanks

@marmbrus
Copy link
Contributor

marmbrus commented Mar 7, 2016

@anirudhcelebal please ask questions on the spark-user list instead of PRs.

explode only works with arrays. you probably want something like SELECT fare.* ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants