-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20986] [SQL] Reset table's statistics after PruneFileSourcePartitions rule. #18205
Conversation
test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { | ||
withTempView("tempTbl", "partTbl") { | ||
spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") | ||
sql("CREATE TABLE partTbl (id INT) PARTITIONED BY (part INT) STORED AS parquet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @lianhuiwang .
withTable("partTbl")
instead of withTempView(..., "partTbl")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks.
Test build #77744 has finished for PR 18205 at commit
|
Test build #77763 has finished for PR 18205 at commit
|
Test build #77842 has finished for PR 18205 at commit
|
Can you please explain why |
OK. I get your point. But the test case does not clearly show the problem. We can first analyze the table to fill stats in |
val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) | ||
|
||
val withStats = logicalRelation.catalogTable.map(_.copy( | ||
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment here indicating we are reseting stats based on pruned file size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Thanks.
val df = sql("SELECT * FROM partTbl where part = 1") | ||
val query = df.queryExecution.analyzed.analyze | ||
val sizes1 = query.collect { | ||
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better not to compute stats for an analyzed plan. We can use spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats
to query the catalog stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Thanks.
val sizes2 = Optimize.execute(query).collect { | ||
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes | ||
} | ||
assert(sizes2.size === 1, s"Size wrong for:\n ${df.queryExecution}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert the new size in catalog stats is larger than the previous one, and equal to computeStats(conf).sizeInBytes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I donot think that it have changed the stats of catalog. after the optimizer, the size in catalog stats is larger than computeStats(conf).sizeInBytes because the partition pruned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogicalRelation
overrides computeStats
and it will use CatalogStatistics
if it exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I donot think that it have changed the stats of catalog.
Don't we reset the catalog stats using the pruned size here?
Test build #77892 has finished for PR 18205 at commit
|
val df = sql("SELECT * FROM partTbl where part = 1") | ||
val query = df.queryExecution.analyzed.analyze | ||
val sizes1 = query.collect { | ||
case relation: LogicalRelation => relation.computeStats(conf).sizeInBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get catalog stats by relation.catalogTable.get.stats.get
here and check it? I just think we need to cover this reset code path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Thanks.
} | ||
|
||
val tableName = "partTbl" | ||
sql(s"analyze table partTbl compute STATISTICS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ANALYZE TABLE partTbl COMPUTE STATISTICS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Thanks.
|
||
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { | ||
val df = sql("SELECT * FROM partTbl where part = 1") | ||
val query = df.queryExecution.analyzed.analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just df.queryExecution.analyzed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there is SubqueryAlias plan, I think that we need analyze() to eliminate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but why we need to eliminate SubqueryAlias here?
test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { | ||
withTempView("tempTbl") { | ||
withTable("partTbl") { | ||
spark.range(1000).selectExpr("id").createOrReplaceTempView("tempTbl") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this test, we can use a much smaller size (e.g. 10) to accelerate testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Thanks.
@wzhfy I have addressed your comments. Thanks. |
Test build #77934 has finished for PR 18205 at commit
|
assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") | ||
assert(sizes1(0) == tableStats.get.sizeInBytes) | ||
val sizes2 = Optimize.execute(query).collect { | ||
case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed the wrong place? For size1
, could you get the catalog stats? We'd better not to computeStats
for analyzed plan. After optimization, for size2
or size3
, we can get sizes from both the catalog stats and computeStats
, see if they are equal and larger than size1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Thanks. I have update with it.
Test build #77958 has finished for PR 18205 at commit
|
LGTM, ping @cloud-fan |
|
||
// Change table stats based on the sizeInBytes of pruned files | ||
val withStats = logicalRelation.catalogTable.map(_.copy( | ||
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we ignore all column stats here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Now it replace stats of CatalogTable with new CatalogStatistics() like DetermineTableStats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column stats are collected as table-level, here we need partition-specific stats, so we can ignore column stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah actually we have to, the column stats is table level and is invalid for partitions.
test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule") { | ||
withTempView("tempTbl") { | ||
withTable("partTbl") { | ||
spark.range(10).selectExpr("id").createOrReplaceTempView("tempTbl") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to simplify the test:
spark.range(10).select('id, 'id % 3 as 'p).write.partitionBy("p").saveAsTable("t")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes,Great, Thanks.
} | ||
assert(sizes1.size === 1, s"Size wrong for:\n ${df.queryExecution}") | ||
assert(sizes1(0) == tableStats.get.sizeInBytes) | ||
val relations = Optimize.execute(query).collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
df.queryExecution.optimized
} | ||
assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}") | ||
val size2 = relations(0).computeStats(conf).sizeInBytes | ||
val size3 = relations(0).catalogTable.get.stats.get.sizeInBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes)
assert(size2 < tableStats.get.sizeInBytes)
@cloud-fan I have addressed your comments. Thanks. |
Test build #77989 has finished for PR 18205 at commit
|
We have another related PR: #14655 |
thanks, merging to master/2.2! |
…itions rule. ## What changes were proposed in this pull request? After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed. ## How was this patch tested? add unit test. Author: lianhuiwang <lianhuiwang09@gmail.com> Closes #18205 from lianhuiwang/SPARK-20986. (cherry picked from commit 8b5b2e2) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan Thanks. |
…itions rule. ## What changes were proposed in this pull request? After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed. ## How was this patch tested? add unit test. Author: lianhuiwang <lianhuiwang09@gmail.com> Closes apache#18205 from lianhuiwang/SPARK-20986.
What changes were proposed in this pull request?
After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.
How was this patch tested?
add unit test.