Skip to content

Commit

Permalink
Improve tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang committed Jun 29, 2024
1 parent 86e6b7d commit dd70a4c
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -953,29 +951,6 @@ public void testTransformFilter() {
.isTrue();
}

@Test
public void testParquetFindsResidual() {
Assumptions.assumeThat(format).isEqualTo(FileFormat.PARQUET);

Expression mightMatch = notEqual("some_nulls", "some");
Expression cannotMatch = equal("id", INT_MIN_VALUE - 25);

Expression expected = Binder.bind(SCHEMA.asStruct(), Expressions.rewriteNot(mightMatch), true);
ParquetMetricsRowGroupFilter filter =
new ParquetMetricsRowGroupFilter(SCHEMA, Expressions.or(mightMatch, cannotMatch), true);
Expression actual = filter.residualFor(parquetSchema, rowGroupMetadata);

Assertions.assertThat(actual.isEquivalentTo(expected))
.overridingErrorMessage("Expected: %s, actual: %s", expected, actual);

filter =
new ParquetMetricsRowGroupFilter(SCHEMA, Expressions.and(mightMatch, cannotMatch), true);
expected = Expressions.alwaysFalse();
actual = filter.residualFor(parquetSchema, rowGroupMetadata);
Assertions.assertThat(actual.isEquivalentTo(expected))
.overridingErrorMessage("Expected: %s, actual: %s", expected, actual);
}

private boolean shouldRead(Expression expression) {
return shouldRead(expression, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(((Number) value).intValue());
return bloom.findHash(hashValue);
default:
return true; /* rows might match */
return true; /* rows might match */
}
case INT64:
switch (type.typeId()) {
Expand All @@ -313,7 +313,7 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(((Number) value).longValue());
return bloom.findHash(hashValue);
default:
return true; /* rows might match */
return true; /* rows might match */
}
case FLOAT:
hashValue = bloom.hash(((Number) value).floatValue());
Expand Down Expand Up @@ -346,10 +346,10 @@ private <T> boolean shouldRead(
hashValue = bloom.hash(Binary.fromConstantByteArray(UUIDUtil.convert((UUID) value)));
return bloom.findHash(hashValue);
default:
return true; /* rows might match */
return true; /* rows might match */
}
default:
return true; /* rows might match */
return true; /* rows might match */
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;

import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;

public class ParquetCombinedRowGroupFilter {

private final Schema schema;
private final Expression expr;
private final boolean caseSensitive;

public ParquetCombinedRowGroupFilter(Schema schema, Expression expr, boolean caseSensitive) {
this.schema = schema;
Types.StructType struct = schema.asStruct();
if (Binder.isBound(expr)) {
this.expr = Expressions.rewriteNot(expr);
} else {
this.expr = Binder.bind(struct, Expressions.rewriteNot(expr), caseSensitive);
}

this.caseSensitive = caseSensitive;
}

/**
* Test whether the file may contain records that match the expression.
*
* @param fileSchema schema for the Parquet file
* @param rowGroup metadata for a row group
* @param reader file reader for the Parquet file
* @return false if the file cannot contain rows that match the expression, true otherwise.
*/
public boolean shouldRead(
MessageType fileSchema, BlockMetaData rowGroup, ParquetFileReader reader) {
return residualFor(fileSchema, rowGroup, reader) != Expressions.alwaysFalse();
}

public Expression residualFor(
MessageType fileSchema, BlockMetaData rowGroup, ParquetFileReader reader) {
ParquetMetricsRowGroupFilter metricFilter =
new ParquetMetricsRowGroupFilter(schema, expr, caseSensitive);
Expression metricResidual = metricFilter.residualFor(fileSchema, rowGroup);
if (metricResidual == Expressions.alwaysFalse() || metricResidual == Expressions.alwaysTrue()) {
return metricResidual;
}

ParquetDictionaryRowGroupFilter dictFilter =
new ParquetDictionaryRowGroupFilter(schema, metricResidual, caseSensitive);

Expression dictResidual =
dictFilter.residualFor(fileSchema, rowGroup, reader.getDictionaryReader(rowGroup));

if (dictResidual == Expressions.alwaysFalse() || dictResidual == Expressions.alwaysTrue()) {
return dictResidual;
}

ParquetBloomRowGroupFilter bloomFilter =
new ParquetBloomRowGroupFilter(schema, dictResidual, caseSensitive);
return bloomFilter.residualFor(fileSchema, rowGroup, reader.getBloomFilterDataReader(rowGroup));
}
}
37 changes: 6 additions & 31 deletions parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -101,15 +100,18 @@ class ReadConf<T> {
// Fetch all row groups starting positions to compute the row offsets of the filtered row groups
Map<Long, Long> offsetToStartPos = generateOffsetToStartPos(expectedSchema);

ParquetCombinedRowGroupFilter combinedFilter = null;
if (filter != null) {
combinedFilter = new ParquetCombinedRowGroupFilter(expectedSchema, filter, caseSensitive);
}

long computedTotalValues = 0L;
for (int i = 0; i < shouldSkip.length; i += 1) {
BlockMetaData rowGroup = rowGroups.get(i);
startRowPositions[i] =
offsetToStartPos == null ? 0 : offsetToStartPos.get(rowGroup.getStartingPos());
boolean shouldRead =
filter == null
|| (findsResidual(filter, expectedSchema, typeWithIds, rowGroup, caseSensitive)
!= Expressions.alwaysFalse());
filter == null || combinedFilter.shouldRead(fileSchema, rowGroup, reader);

this.shouldSkip[i] = !shouldRead;
if (shouldRead) {
Expand Down Expand Up @@ -254,31 +256,4 @@ private List<Map<ColumnPath, ColumnChunkMetaData>> getColumnChunkMetadataForRowG
}
return listBuilder.build();
}

private Expression findsResidual(
Expression expr,
Schema expectedSchema,
MessageType typeWithIds,
BlockMetaData rowGroup,
boolean caseSensitive) {
ParquetMetricsRowGroupFilter metricFilter =
new ParquetMetricsRowGroupFilter(expectedSchema, expr, caseSensitive);
Expression metricResidual = metricFilter.residualFor(typeWithIds, rowGroup);
if (metricResidual == Expressions.alwaysFalse() || metricResidual == Expressions.alwaysTrue()) {
return metricResidual;
}

ParquetDictionaryRowGroupFilter dictFilter =
new ParquetDictionaryRowGroupFilter(expectedSchema, metricResidual, caseSensitive);
Expression dictResidual =
dictFilter.residualFor(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup));
if (dictResidual == Expressions.alwaysFalse() || dictResidual == Expressions.alwaysTrue()) {
return dictResidual;
}

ParquetBloomRowGroupFilter bloomFilter =
new ParquetBloomRowGroupFilter(expectedSchema, dictResidual, caseSensitive);
return bloomFilter.residualFor(
typeWithIds, rowGroup, reader.getBloomFilterDataReader(rowGroup));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
Expand Down Expand Up @@ -185,7 +183,6 @@ public class TestBloomRowGroupFilter {
private MessageType parquetSchema = null;
private BlockMetaData rowGroupMetadata = null;
private BloomFilterReader bloomStore = null;
private ParquetFileReader reader;

@TempDir private File temp;

Expand Down Expand Up @@ -269,7 +266,7 @@ public void createInputFile() throws IOException {

InputFile inFile = Files.localInput(temp);

reader = ParquetFileReader.open(ParquetIO.file(inFile));
ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(inFile));

assertThat(reader.getRowGroups()).as("Should create only one row group").hasSize(1);
rowGroupMetadata = reader.getRowGroups().get(0);
Expand Down Expand Up @@ -1192,40 +1189,4 @@ public void testTransformFilter() {
.as("Should read: filter contains non-reference evaluate as True")
.isTrue();
}

@Test
public void testParquetFindsResidual() {
// `string` col has no dict, this should be eliminated by bloom filter
Expression bloom = equal("string", "BINARY测试_301");
// should be eliminated by dictionary filter
Expression dict = equal("no_stats", "a");
// should be eliminated by bloom filter
Expression metric = greaterThan("id", INT_MAX_VALUE);
Expression expr = or(bloom, or(dict, metric));

Expression expected = Binder.bind(SCHEMA.asStruct(), Expressions.or(bloom, dict), true);
ParquetMetricsRowGroupFilter metricFilter = new ParquetMetricsRowGroupFilter(SCHEMA, expr);
Expression metricResidual = metricFilter.residualFor(parquetSchema, rowGroupMetadata);
assertThat(expected.isEquivalentTo(metricResidual))
.as("Expected residual: %s, actual residual: %s", expected, metricResidual);

expected = Binder.bind(SCHEMA.asStruct(), bloom, true);
ParquetDictionaryRowGroupFilter dictFilter =
new ParquetDictionaryRowGroupFilter(SCHEMA, metricResidual);
Expression dictResidual =
dictFilter.residualFor(
parquetSchema, rowGroupMetadata, reader.getDictionaryReader(rowGroupMetadata));

assertThat(expected.isEquivalentTo(dictResidual))
.as("Expected residual: %s, actual residual: %s", expected, dictResidual);

expected = Expressions.alwaysFalse();
ParquetBloomRowGroupFilter bloomFilter = new ParquetBloomRowGroupFilter(SCHEMA, dictResidual);
Expression bloomResidual =
bloomFilter.residualFor(
parquetSchema, rowGroupMetadata, reader.getBloomFilterDataReader(rowGroupMetadata));

assertThat(expected.isEquivalentTo(bloomResidual))
.as("Expected residual: %s, actual residual: %s", expected, bloomResidual);
}
}
Loading

0 comments on commit dd70a4c

Please sign in to comment.