-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OR condition does not leverage all parquet metadata (metrics, dictionary, bloom filter) causing inefficient queries #10029
Comments
@cccs-jc Thanks a lot for your thorough investigation and analysis! The problem you described will also occur without a bloom filter. Let's use the where clause The statsFilter will determine that the col1 with value 1 is within the range of 0 to 5, so it returns The dictFilter will determine that the value 1 is not in the dictionary of col1, so it returns Since both the statsFilter and dictFilter returns True for It would be ideal if we could combine the |
@huaxingao You are absolutely correct; the issue arises also when combining the The crux of the issue lies in the evaluation of conditions such as Ideally, for every column, we would want to combine the information from stats, dict and bloom. This sounds like a "Chain of Responsibility." So instead of the current implementation: statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
bloomFilter = new ParquetBloomRowGroupFilter(expectedSchema, filter, caseSensitive);
boolean shouldRead =
filter == null
|| (statsFilter.shouldRead(typeWithIds, rowGroup)
&& dictFilter.shouldRead(
typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup))
&& bloomFilter.shouldRead(
typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup))); I propose the following: statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
bloomFilter = new ParquetBloomRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter.setDict(reader.getDictionaryReader(rowGroup));
bloomFilter.setBloom(reader.getBloomFilterDataReader(rowGroup));
// Chain statsFilter -> bloomFilter -> dictFilter
bloomFilter.setNext(dictFilter);
statsFilter.setNext(bloomFilter);
boolean shouldRead =
filter == null
|| statsFilter.shouldRead(typeWithIds, rowGroup); In this setup, when the This approach effectively combines all the information (metrics, bloom, dict) for a given Basically the chain is like this: It does the same with Finally, the results for every column ( Does this seem reasonable? |
@cccs-jc Thanks for your proposal! For filter
Your suggested modification appears to be:
This approach seems logically sound. However, the change you proposed may require modifications to the current predicate evaluation model. I suspect this could necessitate quite a few adjustments. |
@cccs-jc @huaxingao I've met the same issue before. Because the three row-group filters cannot work together, some query expressions containing OR cannot filter data. I have done some work to solve this problem: #6893, which draws on the idea of ResidualEvaluator to allow three row-group filters to work together to solve this problem. |
This weekend I fixed the issue with the three row-group filters not working together. The results are quite impressive 11 seconds vs 396.
What I did is implement a new @Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
return visitors.stream().allMatch(v -> v.eq(ref, lit) == ROWS_MIGHT_MATCH);
} For every column it sequentially tests the metrics, dictionary and bloom if all of them return ROWS_MIGHT_MATCH then a shouldRead=True is returned. Here is my dev branch: https://github.com/CybercentreCanada/iceberg/tree/parquet_column_eval @zhongyujiang I'll have a look at your PR today. |
I've been following this thread and after thinking about the proposed solution and going through the code a bit more, I think @cccs-jc approach is logically sound. This is very promising @cccs-jc, I'll take a look at the PR. This is one where we'll want to be sure of good coverage via test cases since it's on the critical path for correctness. Also, I noticed you raised it to the 1.5 release branch, when you get a chance could you raise it to main instead? |
@amogh-jahagirdar I'm going to apply this patch to our internal deployment of Iceberg 1.5 and will likely run with it for a while. At the same time I will create a PR to the main branch of the official Iceberg repo. I agree this needs good testing. I have started writing unit tests specifically for this. Luckily I did not have to modify the metrics, dictionary and bloom visitor too much. The biggest change is in the way these are combined. @huaxingao @RussellSpitzer @zhongyujiang are we in agreement with this approach taken in this patch https://github.com/CybercentreCanada/iceberg/tree/parquet_column_eval. I think this is a substantial improvement over the current implementation. |
@cccs-jc @amogh-jahagirdar We know that currently, the metric filter, dict filter, and bloom filter are evaluated in sequential order, with short-circuiting logic. This is because the metric filter incurs very little overhead, as it utilizes stats information stored in the Parquet footer. However, the dict filter needs to read the dict page in the row group, resulting in additional I/O and decoding overhead. In the current logic, if the evaluation result of the metric filter is false, there is no need to evaluate the dict filter.
Another advantage of using the residual evaluator is that the computed residual expression can enhance the filtering effect of subsequent Parquet page filters, I mean, if we were to support page filters in the future. |
@zhongyujiang I double checked and yes the |
@cccs-jc After further recollection, I believe I now remember the exact reason why I initially chose to use the residual filter: |
@zhongyujiang I agree using the residual approach is more efficient. I took a closer look at your code and it's starting to make sense to me. I vote for your implementation. @rdblue This is an important improvement for us. Could you give your opinion on PR #6893 and my branch https://github.com/CybercentreCanada/iceberg/tree/parquet_column_eval. Thank you |
Apache Iceberg version
1.4.3
Query engine
Spark
Please describe the bug 🐞
I'm testing a table of flow data with a schema of
SRC_IP long, DST_IP long
I did a thorough investigation and there really seem to be a problem with bloom are used...
Here I have done a search for a random IP value. I do it 10 times to get a precise average execution time.
I use 512MB target file size and vary the row group size of from 128MB to 16MB. I also have 1 test done with files which have no blooms in them. All files are zordered by SRC and DST IP.
The
where
clauses areSRC_IP=val
,DST_IP=val
,SRC_IP=val AND DST_IP=val
,SRC_IP=val OR DST_IP=val
As you can see with 128MB row group query SRC task on average 4.07 seconds, DST takes 5.11 seconds. But doing an OR will take on average 72 seconds.
this is in contrast with the no bloom scenario where it takes 17 seconds, 46 seconds and for the OR 101 seconds.
Looking at the Iceberg code. I suspect the issue is as described:
There are 3 types of expression evaluators:
ParquetMetricsRowGroupFilter
ParquetDictionaryRowGroupFilter
ParquetBloomRowGroupFilter
The ReadConf applies these 3 evaluators like so:
iceberg/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Line 118 in c9795fd
If any of the 3 evaluator say it's not possible for a particular value to be found in a file's rowgroup then that rowgroup will be skipped.
Let's say the
where
clause isSRC_IP=1 OR DST_IP=1
And let's suppose both columns are dictionary encoded.
The
dictFilter
will determine that the value 1 is not in the dictionary of SRC_IP and not in the dictionary of DST_IP. So it returnsshouldRead=False
.However, if SRC_IP is dictionary encoded and DST_IP is not (it uses a bloom).
Then the when
dictFilter
evaluates DST_IP=1, it returnsshouldRead=True
because there is no dictionary so it can't rule it out. It returnsshouldRead=True
.Conversely when the
bloomFilter
test SRC_IP=1, and determines there is no bloom on SRC_IP ( dictionary encoded), it returnsshouldRead=True
because again it can't rule it out.The result are combined by
ReadConf
and result in ashouldRead=True
. Even though based on the dictionary of SRC_IP and the bloom information on DST_IP we should have skipped the rowgroup. But since the evaluation is done independently neither evaluator can make that decision.To prove my hypothesis I created a new dataset where I set the
write.parquet.dict-size-bytes
very low to make sure it does not use the dictionary encoding. Since the columns are not dictionary encoded they use a bloom filter.The results here show that when both SRC_IP and DST_IP columns are using bloom it is fast to evaluate the
OR
condition. 10 seconds compared to 379 seconds.I propose that the ParquetDictionaryRowGroupFilter and ParquetBloomRowGroupFilter be combined into a single evaluator that test every column. Each column would be tested either using a dictionary or a bloom.
@huaxingao @RussellSpitzer
The text was updated successfully, but these errors were encountered: