Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Make row-group filters cooperate to filter #10090

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Parquet: Make row-group filters cooperate to filter.
  • Loading branch information
zhongyujiang committed Jun 30, 2024
commit 7e975458ddc35b975abac22330f6104b259a0200
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public class ExpressionVisitors {

private ExpressionVisitors() {}

public interface SupportsLazyEvaluation<R> {
R and(Supplier<R> left, Supplier<R> right);

R or(Supplier<R> left, Supplier<R> right);
}

public abstract static class ExpressionVisitor<R> {
public R alwaysTrue() {
return null;
Expand Down Expand Up @@ -363,10 +369,20 @@ public static <R> R visit(Expression expr, ExpressionVisitor<R> visitor) {
return visitor.not(visit(not.child(), visitor));
case AND:
And and = (And) expr;
return visitor.and(visit(and.left(), visitor), visit(and.right(), visitor));
if (visitor instanceof SupportsLazyEvaluation) {
return ((SupportsLazyEvaluation<R>) visitor)
.and(() -> visit(and.left(), visitor), () -> visit(and.right(), visitor));
} else {
return visitor.and(visit(and.left(), visitor), visit(and.right(), visitor));
}
case OR:
Or or = (Or) expr;
return visitor.or(visit(or.left(), visitor), visit(or.right(), visitor));
if (visitor instanceof SupportsLazyEvaluation) {
return ((SupportsLazyEvaluation<R>) visitor)
.or(() -> visit(or.left(), visitor), () -> visit(or.right(), visitor));
} else {
return visitor.or(visit(or.left(), visitor), visit(or.right(), visitor));
}
default:
throw new UnsupportedOperationException("Unknown operation: " + expr.op());
}
Expand Down Expand Up @@ -602,4 +618,21 @@ private static <R> Supplier<R> visitExpr(
}
}
}

public abstract static class FindsResidualVisitor extends BoundExpressionVisitor<Expression>
implements SupportsLazyEvaluation<Expression> {
protected static final Expression ROWS_CANNOT_MATCH = Expressions.alwaysFalse();
protected static final Expression ROWS_ALL_MATCH = Expressions.alwaysTrue();
protected static final Expression ROWS_MIGHT_MATCH = null;

@Override
public <T> Expression predicate(BoundPredicate<T> pred) {
Expression result = super.predicate(pred);
if (result == ROWS_CANNOT_MATCH || result == ROWS_ALL_MATCH) {
return result;
}

return pred;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -952,6 +954,31 @@ public void testTransformFilter() {
.isTrue();
}

@Test
public void testParquetFindsResidual() {
Assume.assumeTrue("Only valid for Parquet", format == FileFormat.PARQUET);

Expression mightMatch = notEqual("some_nulls", "some");
Expression cannotMatch = equal("id", INT_MIN_VALUE - 25);

Expression expected = Binder.bind(SCHEMA.asStruct(), Expressions.rewriteNot(mightMatch), true);
ParquetMetricsRowGroupFilter filter =
new ParquetMetricsRowGroupFilter(SCHEMA, Expressions.or(mightMatch, cannotMatch), true);
Expression actual = filter.residualFor(parquetSchema, rowGroupMetadata);

Assert.assertTrue(
String.format("Expected actual: %s, actual: %s", expected.toString(), actual.toString()),
actual.isEquivalentTo(expected));

filter =
new ParquetMetricsRowGroupFilter(SCHEMA, Expressions.and(mightMatch, cannotMatch), true);
expected = Expressions.alwaysFalse();
actual = filter.residualFor(parquetSchema, rowGroupMetadata);
Assert.assertTrue(
String.format("Expected actual: %s, actual: %s", expected.toString(), actual.toString()),
actual.isEquivalentTo(expected));
}

private boolean shouldRead(Expression expression) {
return shouldRead(expression, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Bound;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
import org.apache.iceberg.expressions.ExpressionVisitors.FindsResidualVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -59,14 +60,19 @@ public class ParquetBloomRowGroupFilter {
private final Expression expr;
private final boolean caseSensitive;

public ParquetBloomRowGroupFilter(Schema schema, Expression unbound) {
this(schema, unbound, true);
public ParquetBloomRowGroupFilter(Schema schema, Expression expr) {
this(schema, expr, true);
}

public ParquetBloomRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
public ParquetBloomRowGroupFilter(Schema schema, Expression expr, boolean caseSensitive) {
this.schema = schema;
StructType struct = schema.asStruct();
this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound), caseSensitive);
if (Binder.isBound(expr)) {
this.expr = Expressions.rewriteNot(expr);
} else {
this.expr = Binder.bind(struct, Expressions.rewriteNot(expr), caseSensitive);
}

this.caseSensitive = caseSensitive;
}

Expand All @@ -80,21 +86,23 @@ public ParquetBloomRowGroupFilter(Schema schema, Expression unbound, boolean cas
*/
public boolean shouldRead(
MessageType fileSchema, BlockMetaData rowGroup, BloomFilterReader bloomReader) {
return new BloomEvalVisitor().eval(fileSchema, rowGroup, bloomReader);
return residualFor(fileSchema, rowGroup, bloomReader) != Expressions.alwaysFalse();
}

private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;
public Expression residualFor(
MessageType fileSchema, BlockMetaData rowGroup, BloomFilterReader bloomReader) {
return new BloomEvalVisitor().eval(fileSchema, rowGroup, bloomReader);
}

private class BloomEvalVisitor extends BoundExpressionVisitor<Boolean> {
private class BloomEvalVisitor extends FindsResidualVisitor {
private BloomFilterReader bloomReader;
private Set<Integer> fieldsWithBloomFilter = null;
private Map<Integer, ColumnChunkMetaData> columnMetaMap = null;
private Map<Integer, BloomFilter> bloomCache = null;
private Map<Integer, PrimitiveType> parquetPrimitiveTypes = null;
private Map<Integer, Type> types = null;

private boolean eval(
private Expression eval(
MessageType fileSchema, BlockMetaData rowGroup, BloomFilterReader bloomFilterReader) {
this.bloomReader = bloomFilterReader;
this.fieldsWithBloomFilter = Sets.newHashSet();
Expand Down Expand Up @@ -122,92 +130,102 @@ private boolean eval(
if (!filterRefs.isEmpty()) {
Set<Integer> overlappedBloomFilters = Sets.intersection(fieldsWithBloomFilter, filterRefs);
if (overlappedBloomFilters.isEmpty()) {
return ROWS_MIGHT_MATCH;
return expr;
} else {
LOG.debug("Using Bloom filters for columns with IDs: {}", overlappedBloomFilters);
}
}

return ExpressionVisitors.visitEvaluator(expr, this);
return ExpressionVisitors.visit(expr, this);
}

@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH; // all rows match
public Expression alwaysTrue() {
return ROWS_ALL_MATCH; // all rows match
}

@Override
public Boolean alwaysFalse() {
public Expression alwaysFalse() {
return ROWS_CANNOT_MATCH; // all rows fail
}

@Override
public Boolean not(Boolean result) {
public Expression not(Expression result) {
// not() should be rewritten by RewriteNot
// bloom filter is based on hash and cannot eliminate based on not
throw new UnsupportedOperationException("This path shouldn't be reached.");
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
public Expression and(Supplier<Expression> left, Supplier<Expression> right) {
Expression leftResult = left.get();
if (leftResult == ROWS_CANNOT_MATCH) {
return leftResult;
}

return Expressions.and(leftResult, right.get());
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
public Expression or(Supplier<Expression> left, Supplier<Expression> right) {
Expression leftResult = left.get();
if (leftResult == ROWS_ALL_MATCH) {
return leftResult;
}

return Expressions.or(leftResult, right.get());
}

@Override
public <T> Boolean isNull(BoundReference<T> ref) {
public <T> Expression isNull(BoundReference<T> ref) {
// bloom filter only contain non-nulls and cannot eliminate based on isNull or NotNull
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notNull(BoundReference<T> ref) {
public <T> Expression notNull(BoundReference<T> ref) {
// bloom filter only contain non-nulls and cannot eliminate based on isNull or NotNull
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
public <T> Expression isNaN(BoundReference<T> ref) {
// bloom filter is based on hash and cannot eliminate based on isNaN or notNaN
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
public <T> Expression notNaN(BoundReference<T> ref) {
// bloom filter is based on hash and cannot eliminate based on isNaN or notNaN
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression lt(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression ltEq(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression gt(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression gtEq(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on lt or ltEq or gt or gtEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression eq(BoundReference<T> ref, Literal<T> lit) {
int id = ref.fieldId();
if (!fieldsWithBloomFilter.contains(id)) { // no bloom filter
return ROWS_MIGHT_MATCH;
Expand All @@ -216,17 +234,19 @@ public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
BloomFilter bloom = loadBloomFilter(id);
Type type = types.get(id);
T value = lit.value();
return shouldRead(parquetPrimitiveTypes.get(id), value, bloom, type);
return shouldRead(parquetPrimitiveTypes.get(id), value, bloom, type)
? ROWS_MIGHT_MATCH
: ROWS_CANNOT_MATCH;
}

@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression notEq(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on notEq
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
public <T> Expression in(BoundReference<T> ref, Set<T> literalSet) {
int id = ref.fieldId();
if (!fieldsWithBloomFilter.contains(id)) { // no bloom filter
return ROWS_MIGHT_MATCH;
Expand All @@ -242,19 +262,19 @@ public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
}

@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
public <T> Expression notIn(BoundReference<T> ref, Set<T> literalSet) {
// bloom filter is based on hash and cannot eliminate based on notIn
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression startsWith(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on startsWith
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
public <T> Expression notStartsWith(BoundReference<T> ref, Literal<T> lit) {
// bloom filter is based on hash and cannot eliminate based on startsWith
return ROWS_MIGHT_MATCH;
}
Expand Down Expand Up @@ -290,7 +310,7 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(((Number) value).intValue());
return bloom.findHash(hashValue);
default:
return ROWS_MIGHT_MATCH;
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more readable to keep the constants:

Suggested change
return true;
return ROWS_MIGHT_MATCH;

A few more occurrences below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because the types of these two constants have been changed from boolean to the generic class Expression of FindsResidualVisitor.

For readability, maybe we can add a comment after the boolean value ?

return true; /* rows might match */

}
case INT64:
switch (type.typeId()) {
Expand All @@ -304,7 +324,7 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(((Number) value).longValue());
return bloom.findHash(hashValue);
default:
return ROWS_MIGHT_MATCH;
return true;
}
case FLOAT:
hashValue = bloom.hash(((Number) value).floatValue());
Expand Down Expand Up @@ -337,15 +357,15 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(Binary.fromConstantByteArray(UUIDUtil.convert((UUID) value)));
return bloom.findHash(hashValue);
default:
return ROWS_MIGHT_MATCH;
return true;
}
default:
return ROWS_MIGHT_MATCH;
return true;
}
}

@Override
public <T> Boolean handleNonReference(Bound<T> term) {
public <T> Expression handleNonReference(Bound<T> term) {
return ROWS_MIGHT_MATCH;
}
}
Expand Down
Loading