Skip to content

Commit

Permalink
Add physical file positions to OrcRecordReader
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jun 4, 2015
1 parent e6d0f57 commit be093a2
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Comparator.comparingLong;

public class OrcRecordReader
{
Expand All @@ -55,6 +60,10 @@ public class OrcRecordReader
private final StripeReader stripeReader;
private int currentStripe = -1;

private final long fileRowCount;
private final List<Long> stripeFilePositions;
private long filePosition = -1;

private Iterator<RowGroup> rowGroups = ImmutableList.<RowGroup>of().iterator();
private long currentGroupRowCount;
private long nextRowInGroup;
Expand Down Expand Up @@ -106,20 +115,38 @@ public OrcRecordReader(
// it is possible that old versions of orc use 0 to mean there are no row groups
checkArgument(rowsInRowGroup > 0, "rowsInRowGroup must be greater than zero");

// sort stripes by file position
List<StripeInfo> stripeInfos = new ArrayList<>();
for (int i = 0; i < fileStripes.size(); i++) {
Optional<StripeStatistics> stats = Optional.empty();
// ignore all stripe stats if too few or too many
if (stripeStats.size() == fileStripes.size()) {
stats = Optional.of(stripeStats.get(i));
}
stripeInfos.add(new StripeInfo(fileStripes.get(i), stats));
}
Collections.sort(stripeInfos, comparingLong(info -> info.getStripe().getOffset()));

long totalRowCount = 0;
long fileRowCount = 0;
ImmutableList.Builder<StripeInformation> stripes = ImmutableList.builder();
ImmutableList.Builder<Long> stripeFilePositions = ImmutableList.builder();
if (predicate.matches(numberOfRows, getStatisticsByColumnOrdinal(root, fileStats))) {
// select stripes that start within the specified split
for (int stripeIndex = 0; stripeIndex < fileStripes.size(); stripeIndex++) {
StripeInformation stripe = fileStripes.get(stripeIndex);
if (splitContainsStripe(splitOffset, splitLength, stripe) && isStripeIncluded(root, stripe, stripeStats, predicate, stripeIndex)) {
for (StripeInfo info : stripeInfos) {
StripeInformation stripe = info.getStripe();
if (splitContainsStripe(splitOffset, splitLength, stripe) && isStripeIncluded(root, stripe, info.getStats(), predicate)) {
stripes.add(stripe);
stripeFilePositions.add(fileRowCount);
totalRowCount += stripe.getNumberOfRows();
}
fileRowCount += stripe.getNumberOfRows();
}
}
this.totalRowCount = totalRowCount;
this.stripes = stripes.build();
this.stripeFilePositions = stripeFilePositions.build();
this.fileRowCount = fileRowCount;

stripeReader = new StripeReader(
orcDataSource,
Expand All @@ -143,23 +170,47 @@ private static boolean splitContainsStripe(long splitOffset, long splitLength, S
private static boolean isStripeIncluded(
OrcType rootStructType,
StripeInformation stripe,
List<StripeStatistics> stripeStats,
OrcPredicate predicate,
int stripeIndex)
Optional<StripeStatistics> stripeStats,
OrcPredicate predicate)
{
// if there are no stats, include the column
if (stripeIndex >= stripeStats.size()) {
if (!stripeStats.isPresent()) {
return true;
}
return predicate.matches(stripe.getNumberOfRows(), getStatisticsByColumnOrdinal(rootStructType, stripeStats.get().getColumnStatistics()));
}

/**
* Return the row position relative to the start of the file.
*/
public long getFilePosition()
{
checkState(filePosition >= 0, "file position is only valid after nextBatch()");
return filePosition;
}

return predicate.matches(stripe.getNumberOfRows(), getStatisticsByColumnOrdinal(rootStructType, stripeStats.get(stripeIndex).getColumnStatistics()));
/**
* Returns the total number of rows in the file. This count includes rows
* for stripes that were completely excluded due to stripe statistics.
*/
public long getFileRowCount()
{
return fileRowCount;
}

/**
* Return the row position within the stripes being read by this reader.
*/
public long getPosition()
{
return currentPosition;
}

/**
* Returns the total number of rows that are available for this reader.
* This count may be fewer than the number of rows in the file if some
* stripes were excluded due to stripe statistics.
*/
public long getTotalRowCount()
{
return totalRowCount;
Expand Down Expand Up @@ -204,6 +255,7 @@ public int nextBatch()
column.prepareNextRead(batchSize);
}
}
filePosition += batchSize;
nextRowInGroup += batchSize;
currentPosition += batchSize;
return batchSize;
Expand Down Expand Up @@ -250,6 +302,7 @@ private void advanceToNextStripe()
if (currentStripe >= stripes.size()) {
return;
}
filePosition = stripeFilePositions.get(currentStripe);

StripeInformation stripeInformation = stripes.get(currentStripe);
Stripe stripe = stripeReader.readStripe(stripeInformation);
Expand Down Expand Up @@ -327,4 +380,26 @@ private static Map<Integer, ColumnStatistics> getStatisticsByColumnOrdinal(OrcTy
}
return statistics.build();
}

private static class StripeInfo
{
private final StripeInformation stripe;
private final Optional<StripeStatistics> stats;

public StripeInfo(StripeInformation stripe, Optional<StripeStatistics> stats)
{
this.stripe = checkNotNull(stripe, "stripe is null");
this.stats = checkNotNull(stats, "metadata is null");
}

public StripeInformation getStripe()
{
return stripe;
}

public Optional<StripeStatistics> getStats()
{
return stats;
}
}
}
12 changes: 7 additions & 5 deletions presto-orc/src/test/java/com/facebook/presto/orc/OrcTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ else if (skipFirstBatch && isFirst) {
}
}
rowsProcessed += batchSize;
assertEquals(recordReader.getPosition(), rowsProcessed);
assertEquals(recordReader.getFilePosition(), rowsProcessed);
}
assertFalse(iterator.hasNext());
recordReader.close();
Expand Down Expand Up @@ -416,7 +418,7 @@ private static Vector createResultsVector(ObjectInspector objectInspector)
}
}

private static OrcRecordReader createCustomOrcRecordReader(TempFile tempFile, MetadataReader metadataReader, OrcPredicate predicate, Type type)
static OrcRecordReader createCustomOrcRecordReader(TempFile tempFile, MetadataReader metadataReader, OrcPredicate predicate, Type type)
throws IOException
{
OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE), new DataSize(1, Unit.MEGABYTE));
Expand Down Expand Up @@ -504,7 +506,7 @@ private static void setFieldValue(Object instance, String name, Object value)
}
}

private static RecordWriter createOrcRecordWriter(File outputFile, Format format, Compression compression, ObjectInspector columnObjectInspector)
static RecordWriter createOrcRecordWriter(File outputFile, Format format, Compression compression, ObjectInspector columnObjectInspector)
throws IOException
{
JobConf jobConf = new JobConf();
Expand Down Expand Up @@ -543,7 +545,7 @@ private static RecordWriter createDwrfRecordWriter(File outputFile, Compression
);
}

private static SettableStructObjectInspector createSettableStructObjectInspector(String name, ObjectInspector objectInspector)
static SettableStructObjectInspector createSettableStructObjectInspector(String name, ObjectInspector objectInspector)
{
return getStandardStructObjectInspector(ImmutableList.of(name), ImmutableList.of(objectInspector));
}
Expand All @@ -556,12 +558,12 @@ private static Properties createTableProperties(String name, String type)
return orderTableProperties;
}

private static class TempFile
static class TempFile
implements Closeable
{
private final File file;

private TempFile(String prefix, String suffix)
public TempFile(String prefix, String suffix)
{
try {
file = File.createTempFile(prefix, suffix);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;

import com.facebook.presto.orc.OrcTester.TempFile;
import com.facebook.presto.orc.metadata.IntegerStatistics;
import com.facebook.presto.orc.metadata.OrcMetadataReader;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.Writable;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;

import static com.facebook.presto.orc.OrcTester.Format.ORC_12;
import static com.facebook.presto.orc.OrcTester.createCustomOrcRecordReader;
import static com.facebook.presto.orc.OrcTester.createOrcRecordWriter;
import static com.facebook.presto.orc.OrcTester.createSettableStructObjectInspector;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaLongObjectInspector;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

public class TestOrcReaderPositions
{
@Test
public void testEntireFile()
throws Exception
{
try (TempFile tempFile = new TempFile("test", "orc")) {
createMultiStripeFile(tempFile.getFile());

OrcRecordReader reader = createCustomOrcRecordReader(tempFile, new OrcMetadataReader(), OrcPredicate.TRUE, BIGINT);
assertEquals(reader.getFileRowCount(), 100);
assertEquals(reader.getTotalRowCount(), 100);
assertEquals(reader.getPosition(), 0);

for (int i = 0; i < 5; i++) {
assertEquals(reader.nextBatch(), 20);
assertEquals(reader.getPosition(), (i + 1) * 20L);
assertEquals(reader.getFilePosition(), reader.getPosition());
assertCurrentBatch(reader, i);
}

assertEquals(reader.nextBatch(), -1);
assertEquals(reader.getPosition(), 100);
assertEquals(reader.getFilePosition(), reader.getPosition());
reader.close();
}
}

@Test
public void testStripeSkipping()
throws Exception
{
try (TempFile tempFile = new TempFile("test", "orc")) {
createMultiStripeFile(tempFile.getFile());

// test reading second and fourth stripes
OrcPredicate predicate = (numberOfRows, statisticsByColumnIndex) -> {
if (numberOfRows == 100) {
return true;
}
IntegerStatistics stats = statisticsByColumnIndex.get(0).getIntegerStatistics();
return ((stats.getMin() == 60) && (stats.getMax() == 117)) ||
((stats.getMin() == 180) && (stats.getMax() == 237));
};

OrcRecordReader reader = createCustomOrcRecordReader(tempFile, new OrcMetadataReader(), predicate, BIGINT);
assertEquals(reader.getFileRowCount(), 100);
assertEquals(reader.getTotalRowCount(), 40);
assertEquals(reader.getPosition(), 0);

// second stripe
assertEquals(reader.nextBatch(), 20);
assertEquals(reader.getPosition(), 20);
assertEquals(reader.getFilePosition(), 40);
assertCurrentBatch(reader, 1);

// fourth stripe
assertEquals(reader.nextBatch(), 20);
assertEquals(reader.getPosition(), 40);
assertEquals(reader.getFilePosition(), 80);
assertCurrentBatch(reader, 3);

assertEquals(reader.nextBatch(), -1);
assertEquals(reader.getPosition(), 40);
assertEquals(reader.getFilePosition(), 80);
reader.close();
}
}

@Test
public void testFilePositionBeforeNextBatch()
throws Exception
{
try (TempFile tempFile = new TempFile("test", "orc")) {
createMultiStripeFile(tempFile.getFile());

OrcRecordReader reader = createCustomOrcRecordReader(tempFile, new OrcMetadataReader(), OrcPredicate.TRUE, BIGINT);

try {
reader.getFilePosition();
fail("expected exception");
}
catch (IllegalStateException ignored) {
}

reader.close();
}
}

private static void assertCurrentBatch(OrcRecordReader reader, int stripe)
throws IOException
{
LongVector longVector = new LongVector(20);
reader.readVector(0, longVector);
for (int i = 0; i < 20; i++) {
assertEquals(longVector.vector[i], ((stripe * 20L) + i) * 3);
}
}

// write 5 stripes of 20 values each: (0,3,6,..,57), (60,..,117), .., (..297)
private static void createMultiStripeFile(File file)
throws IOException, ReflectiveOperationException, SerDeException
{
FileSinkOperator.RecordWriter writer = createOrcRecordWriter(file, ORC_12, OrcTester.Compression.NONE, javaLongObjectInspector);

@SuppressWarnings("deprecation") Serializer serde = new OrcSerde();
SettableStructObjectInspector objectInspector = createSettableStructObjectInspector("test", javaLongObjectInspector);
Object row = objectInspector.create();
StructField field = objectInspector.getAllStructFieldRefs().get(0);

for (int i = 0; i < 300; i += 3) {
if ((i > 0) && (i % 60 == 0)) {
flushWriter(writer);
}

objectInspector.setStructFieldData(row, field, (long) i);
Writable record = serde.serialize(row, objectInspector);
writer.write(record);
}

writer.close(false);
}

private static void flushWriter(FileSinkOperator.RecordWriter writer)
throws IOException, ReflectiveOperationException
{
Field field = OrcOutputFormat.class.getClassLoader()
.loadClass(OrcOutputFormat.class.getName() + "$OrcRecordWriter")
.getDeclaredField("writer");
field.setAccessible(true);
((Writer) field.get(writer)).writeIntermediateFooter();
}
}

0 comments on commit be093a2

Please sign in to comment.