Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OR condition does not leverage all parquet metadata (metrics, dictionary, bloom filter) causing inefficient queries #10029

Open
cccs-jc opened this issue Mar 22, 2024 · 11 comments · May be fixed by #10090
Labels
bug Something isn't working

Comments

@cccs-jc
Copy link
Contributor

cccs-jc commented Mar 22, 2024

Apache Iceberg version

1.4.3

Query engine

Spark

Please describe the bug 🐞

I'm testing a table of flow data with a schema of SRC_IP long, DST_IP long

I did a thorough investigation and there really seem to be a problem with bloom are used...

Here I have done a search for a random IP value. I do it 10 times to get a precise average execution time.

I use 512MB target file size and vary the row group size of from 128MB to 16MB. I also have 1 test done with files which have no blooms in them. All files are zordered by SRC and DST IP.

The where clauses are SRC_IP=val, DST_IP=val, SRC_IP=val AND DST_IP=val, SRC_IP=val OR DST_IP=val

('R:128MB', 'SRC', 4.070408272743225)
('R:64MB', 'SRC', 3.479648399353027)
('R:32MB', 'SRC', 7.69552972316742)
('R:16MB', 'SRC', 12.549865365028381)
('R:128MB no bloom', 'SRC', 17.634950709342956)
('R:128MB', 'DST', 5.119180655479431)
('R:64MB', 'DST', 5.0318292617797855)
('R:32MB', 'DST', 8.04975097179413)
('R:16MB', 'DST', 16.09592936038971)
('R:128MB no bloom', 'DST', 46.66901330947876)
('R:128MB', 'AND', 2.262153959274292)
('R:64MB', 'AND', 2.3894467115402223)
('R:32MB', 'AND', 4.230756330490112)
('R:16MB', 'AND', 8.178192615509033)
('R:128MB no bloom', 'AND', 6.790379118919373)
('R:128MB', 'OR', 72.06906585693359)
('R:64MB', 'OR', 34.00628011226654)
('R:32MB', 'OR', 25.374402904510497)
('R:16MB', 'OR', 24.86290364265442)
('R:128MB no bloom', 'OR', 101.38280701637268)

As you can see with 128MB row group query SRC task on average 4.07 seconds, DST takes 5.11 seconds. But doing an OR will take on average 72 seconds.
this is in contrast with the no bloom scenario where it takes 17 seconds, 46 seconds and for the OR 101 seconds.

Looking at the Iceberg code. I suspect the issue is as described:

There are 3 types of expression evaluators:
ParquetMetricsRowGroupFilter
ParquetDictionaryRowGroupFilter
ParquetBloomRowGroupFilter

The ReadConf applies these 3 evaluators like so:

boolean shouldRead =
          filter == null
              || (statsFilter.shouldRead(typeWithIds, rowGroup)
                  && dictFilter.shouldRead(
                      typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup))
                  && bloomFilter.shouldRead(
                      typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup)));

If any of the 3 evaluator say it's not possible for a particular value to be found in a file's rowgroup then that rowgroup will be skipped.

Let's say the where clause is SRC_IP=1 OR DST_IP=1

And let's suppose both columns are dictionary encoded.

The dictFilter will determine that the value 1 is not in the dictionary of SRC_IP and not in the dictionary of DST_IP. So it returns shouldRead=False.

However, if SRC_IP is dictionary encoded and DST_IP is not (it uses a bloom).

Then the when dictFilter evaluates DST_IP=1, it returns shouldRead=True because there is no dictionary so it can't rule it out. It returns shouldRead=True.

Conversely when the bloomFilter test SRC_IP=1, and determines there is no bloom on SRC_IP ( dictionary encoded), it returns shouldRead=True because again it can't rule it out.

The result are combined by ReadConf and result in a shouldRead=True. Even though based on the dictionary of SRC_IP and the bloom information on DST_IP we should have skipped the rowgroup. But since the evaluation is done independently neither evaluator can make that decision.

To prove my hypothesis I created a new dataset where I set the write.parquet.dict-size-bytes very low to make sure it does not use the dictionary encoding. Since the columns are not dictionary encoded they use a bloom filter.

The results here show that when both SRC_IP and DST_IP columns are using bloom it is fast to evaluate the OR condition. 10 seconds compared to 379 seconds.

('R:128MB no dict', 'SRC', 8.311380386352539)
('R:128MB', 'DST', 0.7780213356018066)
('R:128MB no dict', 'DST', 0.8464977741241455)
('R:128MB', 'AND', 0.5989606380462646)
('R:128MB no dict', 'AND', 0.7306745052337646)
('R:128MB', 'OR', 379.92671513557434)
('R:128MB no dict', 'OR', 10.23964548110962)

I propose that the ParquetDictionaryRowGroupFilter and ParquetBloomRowGroupFilter be combined into a single evaluator that test every column. Each column would be tested either using a dictionary or a bloom.

@huaxingao @RussellSpitzer

@cccs-jc cccs-jc added the bug Something isn't working label Mar 22, 2024
@huaxingao
Copy link
Contributor

@cccs-jc Thanks a lot for your thorough investigation and analysis!

The problem you described will also occur without a bloom filter. Let's use the where clause col1=1 OR col2=1. Assume the minimum for col1 is 0 and the maximum is 5, while the minimum for col2 is 2 and the maximum is 5. Let's also assume we do not have a bloom filter, col1 is dictionary encoded, and col2 is not.

The statsFilter will determine that the col1 with value 1 is within the range of 0 to 5, so it returns shouldRead = True. Then statsFilter returns shouldRead = False because col2 with value 1 is out of range of 2 to 5. So the statsFilter returns shouldRead = True for col1=1 OR col2=1.

The dictFilter will determine that the value 1 is not in the dictionary of col1, so it returns shouldRead=False. Then when dictFilter evaluates col2=1, it returns shouldRead=True because there is no dictionary so it can't rule it out. The dictFilter returns shouldRead = True for col1=1 OR col2=1.

Since both the statsFilter and dictFilter returns True for col1=1 OR col2=1, we can't skip read the row group.

It would be ideal if we could combine the shouldRead=False for col2=1 in statsFilter and the shouldRead=False for col1=1 in dictFilter, but it doesn't seem to be an easy way to do so.

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Mar 23, 2024

@huaxingao You are absolutely correct; the issue arises also when combining the statsFilter with the dictFilter. It's essentially the same underlying problem.

The crux of the issue lies in the evaluation of conditions such as col1=1 OR col2=1, where we're layering the complete evaluation performed by statsFilter, with the evaluation from dictFilter, with the evaluation of bloomFilter.

Ideally, for every column, we would want to combine the information from stats, dict and bloom. This sounds like a "Chain of Responsibility."

So instead of the current implementation:

statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
bloomFilter = new ParquetBloomRowGroupFilter(expectedSchema, filter, caseSensitive);

boolean shouldRead =
          filter == null
              || (statsFilter.shouldRead(typeWithIds, rowGroup)
                  && dictFilter.shouldRead(
                      typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup))
                  && bloomFilter.shouldRead(
                      typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup)));

I propose the following:

statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
bloomFilter = new ParquetBloomRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter.setDict(reader.getDictionaryReader(rowGroup));
bloomFilter.setBloom(reader.getBloomFilterDataReader(rowGroup));
					  
// Chain statsFilter -> bloomFilter -> dictFilter
bloomFilter.setNext(dictFilter);
statsFilter.setNext(bloomFilter);

boolean shouldRead =
          filter == null
              || statsFilter.shouldRead(typeWithIds, rowGroup);

In this setup, when the statsFilter evaluates an expression like eq and determines that ROWS_CANNOT_MATCH, it returns immediately. However, if it determines ROWS_MIGHT_MATCH, it passes on the request to the next filter (bloomFilter). The bloomFilter then tests against the bloom information. If it determines ROWS_CANNOT_MATCH, it returns immediately. But if it determines ROWS_MIGHT_MATCH, it then asks the next filter (dictFilter).

This approach effectively combines all the information (metrics, bloom, dict) for a given col1 and accurately determines whether ROWS_CANNOT_MATCH or ROWS_MIGHT_MATCH for col1=1.

Basically the chain is like this: statsFilter.eq(col1=1) -> bloomFilter.eq(col1=1) -> dictFilter.eq(col1=1)

It does the same with col2=1.

Finally, the results for every column (col1, col2) can be combined using the public Boolean or(Boolean leftResult, Boolean rightResult) method. The or method does not need to be delegated since it does not tests against metrics, dict or bloom.

Does this seem reasonable?

@huaxingao
Copy link
Contributor

@cccs-jc Thanks for your proposal!

For filter col1=1 || col2=1, the current implementation is:

shouldRead = statsFilter(col1=1 || col2=1) && dictFilter(col1=1 || col2=1) && bloomFilter(col1=1 || col2=1)

Your suggested modification appears to be:

shouldRead = (statsFilter(col1) && dictFilter(col1=1) && bloomFilter(col1=1)) 
    || (statsFilter(col2) && dictFilter(col1=2) && bloomFilter(col1=2))

This approach seems logically sound. However, the change you proposed may require modifications to the current predicate evaluation model. I suspect this could necessitate quite a few adjustments.

@zhongyujiang
Copy link
Contributor

@cccs-jc @huaxingao I've met the same issue before. Because the three row-group filters cannot work together, some query expressions containing OR cannot filter data. I have done some work to solve this problem: #6893, which draws on the idea of ResidualEvaluator to allow three row-group filters to work together to solve this problem.
Any comments are welcome, if the community agrees with this solution, I can resolve the conflicts and revive that PR.

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Mar 25, 2024

This weekend I fixed the issue with the three row-group filters not working together. The results are quite impressive 11 seconds vs 396.

-----------------results-4CPU-OR-fixed------------------
('R:128MB', 'SRC', 11.399899959564209)
('R:128MB', 'DST', 0.7222678661346436)
('R:128MB', 'AND', 0.6006526947021484)
('R:128MB', 'OR', 11.477725505828857)

-----------------results-4CPU------------------
('R:128MB', 'SRC', 13.441139459609985)
('R:128MB', 'DST', 1.1408600807189941)
('R:128MB', 'AND', 0.9586172103881836)
('R:128MB', 'OR', 396.9800181388855)

What I did is implement a new ParquetCombinedRowGroupFilter which takes the ParquetMetricsRowGroupFilter, ParquetDictionaryRowGroupFilter, ParquetBloomRowGroupFilter and applies them like so

    @Override
    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
      return visitors.stream().allMatch(v -> v.eq(ref, lit) == ROWS_MIGHT_MATCH);
    }

For every column it sequentially tests the metrics, dictionary and bloom if all of them return ROWS_MIGHT_MATCH then a shouldRead=True is returned.

Here is my dev branch: https://github.com/CybercentreCanada/iceberg/tree/parquet_column_eval
I still have to write a unit test to show that OR statement are applied properly now. I'll make a PR and we can compare notes.

@zhongyujiang I'll have a look at your PR today.

@amogh-jahagirdar
Copy link
Contributor

amogh-jahagirdar commented Mar 25, 2024

I've been following this thread and after thinking about the proposed solution and going through the code a bit more, I think @cccs-jc approach is logically sound. This is very promising @cccs-jc, I'll take a look at the PR. This is one where we'll want to be sure of good coverage via test cases since it's on the critical path for correctness.

Also, I noticed you raised it to the 1.5 release branch, when you get a chance could you raise it to main instead?

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Mar 26, 2024

@amogh-jahagirdar I'm going to apply this patch to our internal deployment of Iceberg 1.5 and will likely run with it for a while.

At the same time I will create a PR to the main branch of the official Iceberg repo.

I agree this needs good testing. I have started writing unit tests specifically for this.

Luckily I did not have to modify the metrics, dictionary and bloom visitor too much. The biggest change is in the way these are combined.

@huaxingao @RussellSpitzer @zhongyujiang are we in agreement with this approach taken in this patch https://github.com/CybercentreCanada/iceberg/tree/parquet_column_eval. I think this is a substantial improvement over the current implementation.

@zhongyujiang
Copy link
Contributor

zhongyujiang commented Mar 26, 2024

@cccs-jc @amogh-jahagirdar
I also agree that this approach logically solves the issue here. However, I would like to explain why I used a more complex approach to solve this problem in #6893:

We know that currently, the metric filter, dict filter, and bloom filter are evaluated in sequential order, with short-circuiting logic. This is because the metric filter incurs very little overhead, as it utilizes stats information stored in the Parquet footer. However, the dict filter needs to read the dict page in the row group, resulting in additional I/O and decoding overhead. In the current logic, if the evaluation result of the metric filter is false, there is no need to evaluate the dict filter.

If we use ParquetCombinedRowGroupFilter, for certain expressions, even if the metric filter evaluates to false, the dict filter will still be invoked, resulting in additional overhead. For example, if we have an expression like 'foo = 5 OR bar = 5', even if the metric filter evaluates both sub-expressions to false, the dict filter will still be called to read the dict pages for evaluation.

That's why I used the residual evaluator in #6893. It still allows sequential invocation of the three filters, and the short-circuiting logic can still take effect. The drawback is that I had to transform the three filters into the form of a residual evaluator.
Update: hmm, just realized that the reading of dict pages is done in a lazy mode, so I believe my conclusion is invalid.

Another advantage of using the residual evaluator is that the computed residual expression can enhance the filtering effect of subsequent Parquet page filters, I mean, if we were to support page filters in the future.

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Mar 26, 2024

@zhongyujiang I double checked and yes the ParquetDictionaryRowGroupFilter lazy loads the dictionary data. I also checked the ParquetBloomRowGroupFilter and samething, it lazy loads the bloom filters. So looks like there is little impact of using ParquetCombinedRowGroupFilter.

@zhongyujiang
Copy link
Contributor

If we use ParquetCombinedRowGroupFilter, for certain expressions, even if the metric filter evaluates to false, the dict filter will still be invoked, resulting in additional overhead. For example, if we have an expression like 'foo = 5 OR bar = 5', even if the metric filter evaluates both sub-expressions to false, the dict filter will still be called to read the dict pages for evaluation.

That's why I used the residual evaluator in #6893. It still allows sequential invocation of the three filters, and the short-circuiting logic can still take effect. The drawback is that I had to transform the three filters into the form of a residual evaluator.
Update: hmm, just realized that the reading of dict pages is done in a lazy mode, so I believe my conclusion is invalid.

@cccs-jc After further recollection, I believe I now remember the exact reason why I initially chose to use the residual filter:
The scenario where the combined filter may incur additional overhead is when we have a query like 'foo=5 AND bar=5'. If the metric filter evaluates 'foo=5' as true but 'bar=5' as false, subsequent filters can be skipped. However, in the case of a combined filter, the dict filter would still be invoked to evaluate the expression 'foo=5'.

@cccs-jc
Copy link
Contributor Author

cccs-jc commented Mar 26, 2024

@zhongyujiang I agree using the residual approach is more efficient. I took a closer look at your code and it's starting to make sense to me. I vote for your implementation.

@rdblue This is an important improvement for us. Could you give your opinion on PR #6893 and my branch https://github.com/CybercentreCanada/iceberg/tree/parquet_column_eval. Thank you

@cccs-jc cccs-jc changed the title Bloom filter not properly leveraged when using an OR condition OR condition does not leverage all parquet metadata (metrics, dictionary, bloom filter) causing inefficient queries Mar 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants