Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Make row-group filters cooperate to filter #10090

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -602,4 +602,20 @@ private static <R> Supplier<R> visitExpr(
}
}
}

public abstract static class FindsResidualVisitor extends BoundExpressionVisitor<Expression> {
protected static final Expression ROWS_CANNOT_MATCH = Expressions.alwaysFalse();
protected static final Expression ROWS_ALL_MATCH = Expressions.alwaysTrue();
protected static final Expression ROWS_MIGHT_MATCH = null;

@Override
public <T> Expression predicate(BoundPredicate<T> pred) {
Expression result = super.predicate(pred);
if (result == ROWS_CANNOT_MATCH || result == ROWS_ALL_MATCH) {
return result;
}

return pred;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
import org.apache.iceberg.expressions.ExpressionVisitors.FindsResidualVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -59,14 +59,19 @@ public class ParquetBloomRowGroupFilter {
private final Expression expr;
private final boolean caseSensitive;

public ParquetBloomRowGroupFilter(Schema schema, Expression unbound) {
this(schema, unbound, true);
public ParquetBloomRowGroupFilter(Schema schema, Expression expr) {
this(schema, expr, true);
}

public ParquetBloomRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
public ParquetBloomRowGroupFilter(Schema schema, Expression expr, boolean caseSensitive) {
this.schema = schema;
StructType struct = schema.asStruct();
this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound), caseSensitive);
if (Binder.isBound(expr)) {
this.expr = Expressions.rewriteNot(expr);
} else {
this.expr = Binder.bind(struct, Expressions.rewriteNot(expr), caseSensitive);
}

this.caseSensitive = caseSensitive;
}

Expand All @@ -80,21 +85,23 @@ public ParquetBloomRowGroupFilter(Schema schema, Expression unbound, boolean cas
*/
public boolean shouldRead(
MessageType fileSchema, BlockMetaData rowGroup, BloomFilterReader bloomReader) {
return new BloomEvalVisitor().eval(fileSchema, rowGroup, bloomReader);
return residualFor(fileSchema, rowGroup, bloomReader) != Expressions.alwaysFalse();
}

private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;
public Expression residualFor(
MessageType fileSchema, BlockMetaData rowGroup, BloomFilterReader bloomReader) {
return new BloomEvalVisitor().eval(fileSchema, rowGroup, bloomReader);
}

private class BloomEvalVisitor extends BoundExpressionVisitor<Boolean> {
private class BloomEvalVisitor extends FindsResidualVisitor {
private BloomFilterReader bloomReader;
private Set<Integer> fieldsWithBloomFilter = null;
private Map<Integer, ColumnChunkMetaData> columnMetaMap = null;
private Map<Integer, BloomFilter> bloomCache = null;
private Map<Integer, PrimitiveType> parquetPrimitiveTypes = null;
private Map<Integer, Type> types = null;

private boolean eval(
private Expression eval(
MessageType fileSchema, BlockMetaData rowGroup, BloomFilterReader bloomFilterReader) {
this.bloomReader = bloomFilterReader;
this.fieldsWithBloomFilter = Sets.newHashSet();
Expand Down Expand Up @@ -122,92 +129,92 @@ private boolean eval(
if (!filterRefs.isEmpty()) {
Set<Integer> overlappedBloomFilters = Sets.intersection(fieldsWithBloomFilter, filterRefs);
if (overlappedBloomFilters.isEmpty()) {
return ROWS_MIGHT_MATCH;
return expr;
} else {
LOG.debug("Using Bloom filters for columns with IDs: {}", overlappedBloomFilters);
}
}

return ExpressionVisitors.visitEvaluator(expr, this);
return ExpressionVisitors.visit(expr, this);
}

@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH; // all rows match
public Expression alwaysTrue() {
return ROWS_ALL_MATCH; // all rows match
}

@Override
public Boolean alwaysFalse() {
public Expression alwaysFalse() {
return ROWS_CANNOT_MATCH; // all rows fail
}

@Override
public Boolean not(Boolean result) {
public Expression not(Expression result) {
// not() should be rewritten by RewriteNot
// bloom filter is based on hash and cannot eliminate based on not
throw new UnsupportedOperationException("This path shouldn't be reached.");
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
public Expression and(Expression leftResult, Expression rightResult) {
return Expressions.and(leftResult, rightResult);
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
public Expression or(Expression leftResult, Expression rightResult) {
return Expressions.or(leftResult, rightResult);
}

@Override
public <T> Boolean isNull(BoundReference<T> ref) {
public <T> Expression isNull(BoundReference<T> ref) {
// bloom filter only contain non-nulls and cannot eliminate based on isNull or NotNull
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notNull(BoundReference<T> ref) {
public <T> Expression notNull(BoundReference<T> ref) {
// bloom filter only contain non-nulls and cannot eliminate based on isNull or NotNull
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
public <T> Expression isNaN(BoundReference<T> ref) {
// bloom filter is based on hash and cannot eliminate based on isNaN or notNaN
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
public <T> Expression notNaN(BoundReference<T> ref) {
// bloom filter is based on hash and cannot eliminate based on isNaN or notNaN
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression lt(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression ltEq(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression gt(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression gtEq(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression eq(BoundReference<T> ref, Literal<T> lit) {
int id = ref.fieldId();
if (!fieldsWithBloomFilter.contains(id)) { // no bloom filter
return ROWS_MIGHT_MATCH;
Expand All @@ -216,17 +223,19 @@ public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
BloomFilter bloom = loadBloomFilter(id);
Type type = types.get(id);
T value = lit.value();
return shouldRead(parquetPrimitiveTypes.get(id), value, bloom, type);
return shouldRead(parquetPrimitiveTypes.get(id), value, bloom, type)
? ROWS_MIGHT_MATCH
: ROWS_CANNOT_MATCH;
}

@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression notEq(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on notEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
public <T> Expression in(BoundReference<T> ref, Set<T> literalSet) {
int id = ref.fieldId();
if (!fieldsWithBloomFilter.contains(id)) { // no bloom filter
return ROWS_MIGHT_MATCH;
Expand All @@ -242,19 +251,19 @@ public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
}

@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
public <T> Expression notIn(BoundReference<T> ref, Set<T> literalSet) {
// bloom filter is based on hash and cannot eliminate based on notIn
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression startsWith(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on startsWith
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression notStartsWith(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on startsWith
return ROWS_MIGHT_MATCH;
}
Expand Down Expand Up @@ -290,7 +299,7 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(((Number) value).intValue());
return bloom.findHash(hashValue);
default:
return ROWS_MIGHT_MATCH;
return true; /* rows might match */
}
case INT64:
switch (type.typeId()) {
Expand All @@ -304,7 +313,7 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(((Number) value).longValue());
return bloom.findHash(hashValue);
default:
return ROWS_MIGHT_MATCH;
return true; /* rows might match */
}
case FLOAT:
hashValue = bloom.hash(((Number) value).floatValue());
Expand Down Expand Up @@ -337,15 +346,15 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(Binary.fromConstantByteArray(UUIDUtil.convert((UUID) value)));
return bloom.findHash(hashValue);
default:
return ROWS_MIGHT_MATCH;
return true; /* rows might match */
}
default:
return ROWS_MIGHT_MATCH;
return true; /* rows might match */
}
}

@Override
public <T> Boolean handleNonReference(Bound<T> term) {
public <T> Expression handleNonReference(Bound<T> term) {
return ROWS_MIGHT_MATCH;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;

import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;

public class ParquetCombinedRowGroupFilter {

private final Schema schema;
private final Expression expr;
private final boolean caseSensitive;

public ParquetCombinedRowGroupFilter(Schema schema, Expression expr, boolean caseSensitive) {
this.schema = schema;
Types.StructType struct = schema.asStruct();
if (Binder.isBound(expr)) {
this.expr = Expressions.rewriteNot(expr);
} else {
this.expr = Binder.bind(struct, Expressions.rewriteNot(expr), caseSensitive);
}

this.caseSensitive = caseSensitive;
}

/**
* Test whether the file may contain records that match the expression.
*
* @param fileSchema schema for the Parquet file
* @param rowGroup metadata for a row group
* @param reader file reader for the Parquet file
* @return false if the file cannot contain rows that match the expression, true otherwise.
*/
public boolean shouldRead(
MessageType fileSchema, BlockMetaData rowGroup, ParquetFileReader reader) {
return residualFor(fileSchema, rowGroup, reader) != Expressions.alwaysFalse();
}

public Expression residualFor(
MessageType fileSchema, BlockMetaData rowGroup, ParquetFileReader reader) {
ParquetMetricsRowGroupFilter metricFilter =
new ParquetMetricsRowGroupFilter(schema, expr, caseSensitive);
Expression metricResidual = metricFilter.residualFor(fileSchema, rowGroup);
if (metricResidual == Expressions.alwaysFalse() || metricResidual == Expressions.alwaysTrue()) {
return metricResidual;
}

ParquetDictionaryRowGroupFilter dictFilter =
new ParquetDictionaryRowGroupFilter(schema, metricResidual, caseSensitive);

Expression dictResidual =
dictFilter.residualFor(fileSchema, rowGroup, reader.getDictionaryReader(rowGroup));

if (dictResidual == Expressions.alwaysFalse() || dictResidual == Expressions.alwaysTrue()) {
return dictResidual;
}

ParquetBloomRowGroupFilter bloomFilter =
new ParquetBloomRowGroupFilter(schema, dictResidual, caseSensitive);
return bloomFilter.residualFor(fileSchema, rowGroup, reader.getBloomFilterDataReader(rowGroup));
}
}
Loading