Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
Merge pull request apache#9104 from riazela/BeamTableStatistics
Browse files Browse the repository at this point in the history
[BEAM-7783] Adding BeamTableStatistics.
  • Loading branch information
akedin committed Jul 22, 2019
2 parents 7ca8b76 + 3a0d9c4 commit ff7a803
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static TextRowCountEstimator.Builder builder() {
* @throws org.apache.beam.sdk.io.TextRowCountEstimator.NoEstimationException if all the sampled
* lines are empty and we have not read all the lines in the matched files.
*/
public Long estimateRowCount(PipelineOptions pipelineOptions)
public Double estimateRowCount(PipelineOptions pipelineOptions)
throws IOException, NoEstimationException {
long linesSize = 0;
int numberOfReadLines = 0;
Expand Down Expand Up @@ -129,7 +129,7 @@ public Long estimateRowCount(PipelineOptions pipelineOptions)
}

if (numberOfReadLines == 0 && sampledEverything) {
return 0L;
return 0d;
}

if (numberOfReadLines == 0) {
Expand All @@ -138,7 +138,7 @@ public Long estimateRowCount(PipelineOptions pipelineOptions)
}

// This is total file sizes divided by average line size.
return totalFileSizes * numberOfReadLines / linesSize;
return (double) totalFileSizes * numberOfReadLines / linesSize;
}

/** Builder for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ public void testNonEmptyFiles() throws Exception {
writer.close();
TextRowCountEstimator textRowCountEstimator =
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build();
Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Assert.assertNotNull(rows);
Assert.assertEquals(150L, rows.longValue());
Assert.assertEquals(150d, rows, 0.01);
}

@Test(expected = FileNotFoundException.class)
public void testEmptyFolder() throws Exception {
TextRowCountEstimator textRowCountEstimator =
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build();
Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
}

@Test
Expand All @@ -82,8 +82,8 @@ public void testEmptyFile() throws Exception {
writer.close();
TextRowCountEstimator textRowCountEstimator =
TextRowCountEstimator.builder().setFilePattern(temporaryFolder.getRoot() + "/**").build();
Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Assert.assertEquals(0L, rows.longValue());
Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Assert.assertEquals(0d, rows, 0.01);
}

@Test(expected = TextRowCountEstimator.NoEstimationException.class)
Expand All @@ -110,7 +110,7 @@ public void testNonExistence() throws Exception {
TextRowCountEstimator.builder()
.setFilePattern(temporaryFolder.getRoot() + "/something/**")
.build();
Long rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Double rows = textRowCountEstimator.estimateRowCount(PipelineOptionsFactory.create());
Assert.assertNull(rows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql;

import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.PBegin;
Expand All @@ -40,7 +40,7 @@ public interface BeamSqlTable {
Schema getSchema();

/** Estimates the number of rows or returns null if there is no estimation. */
default BeamRowCountStatistics getRowCount(PipelineOptions options) {
return BeamRowCountStatistics.UNKNOWN;
default BeamTableStatistics getRowCount(PipelineOptions options) {
return BeamTableStatistics.UNKNOWN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.linq4j.QueryProvider;
Expand All @@ -42,7 +41,6 @@
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.TranslatableTable;

/** Adapter from {@link BeamSqlTable} to a calcite Table. */
Expand Down Expand Up @@ -91,18 +89,16 @@ public Statistic getStatistic() {
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
BeamRowCountStatistics beamStatistics = beamTable.getRowCount(getPipelineOptions());
return beamStatistics.isUnknown()
? Statistics.UNKNOWN
: Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of());
return beamTable.getRowCount(getPipelineOptions());
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}

@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptionsMap);
return new BeamIOSourceRel(
context.getCluster(), relOptTable, beamTable, pipelineOptionsMap, this);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl;

import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.RelReferentialConstraint;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.util.ImmutableBitSet;

/** This class stores row count statistics. */
@Experimental
@Internal
public class BeamTableStatistics implements Serializable, Statistic {
public static final BeamTableStatistics UNKNOWN = new BeamTableStatistics(100d, 0d, true);
public static final BeamTableStatistics UNBOUNDED_UNKNOWN =
new BeamTableStatistics(0d, 0.1, true);
private final boolean unknown;
private final Double rowCount;
private final Double rate;

private BeamTableStatistics(Double rowCount, Double rate, boolean isUnknown) {
this.rowCount = rowCount;
this.rate = rate;
this.unknown = isUnknown;
}

private BeamTableStatistics(Double rowCount, Double rate) {
this(rowCount, rate, false);
}

public static BeamTableStatistics createBoundedTableStatistics(Double rowCount) {
return new BeamTableStatistics(rowCount, 0d);
}

public static BeamTableStatistics createUnboundedTableStatistics(Double rate) {
return new BeamTableStatistics(0d, rate);
}

public Double getRate() {
return rate;
}

public boolean isUnknown() {
return unknown;
}

@Override
public Double getRowCount() {
return rowCount;
}

@Override
public boolean isKey(ImmutableBitSet columns) {
return false;
}

@Override
public List<RelReferentialConstraint> getReferentialConstraints() {
return ImmutableList.of();
}

@Override
public List<RelCollation> getCollations() {
return ImmutableList.of();
}

@Override
public RelDistribution getDistribution() {
return RelDistributionTraitDef.INSTANCE.getDefault();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,43 @@

import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.metadata.RelMetadataQuery;

/** BeamRelNode to replace a {@code TableScan} node. */
public class BeamIOSourceRel extends TableScan implements BeamRelNode {

private final BeamSqlTable sqlTable;
private final BeamSqlTable beamTable;
private final BeamCalciteTable calciteTable;
private final Map<String, String> pipelineOptions;

public BeamIOSourceRel(
RelOptCluster cluster,
RelOptTable table,
BeamSqlTable sqlTable,
Map<String, String> pipelineOptions) {
BeamSqlTable beamTable,
Map<String, String> pipelineOptions,
BeamCalciteTable calciteTable) {
super(cluster, cluster.traitSetOf(BeamLogicalConvention.INSTANCE), table);
this.sqlTable = sqlTable;
this.beamTable = beamTable;
this.calciteTable = calciteTable;
this.pipelineOptions = pipelineOptions;
}

@Override
public double estimateRowCount(RelMetadataQuery mq) {
return super.estimateRowCount(mq);
}

@Override
public PCollection.IsBounded isBounded() {
return sqlTable.isBounded();
return beamTable.isBounded();
}

@Override
Expand All @@ -64,12 +74,12 @@ public PCollection<Row> expand(PCollectionList<Row> input) {
"Should not have received input for %s: %s",
BeamIOSourceRel.class.getSimpleName(),
input);
return sqlTable.buildIOReader(input.getPipeline().begin());
return beamTable.buildIOReader(input.getPipeline().begin());
}
}

protected BeamSqlTable getBeamSqlTable() {
return sqlTable;
return beamTable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.Serializable;
import java.math.BigInteger;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
Expand All @@ -47,7 +47,7 @@
class BigQueryTable extends BaseBeamTable implements Serializable {
@VisibleForTesting final String bqLocation;
private final ConversionOptions conversionOptions;
private BeamRowCountStatistics rowCountStatistics = null;
private BeamTableStatistics rowCountStatistics = null;
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class);

BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
Expand All @@ -57,7 +57,7 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
}

@Override
public BeamRowCountStatistics getRowCount(PipelineOptions options) {
public BeamTableStatistics getRowCount(PipelineOptions options) {

if (rowCountStatistics == null) {
rowCountStatistics = getRowCountFromBQ(options, bqLocation);
Expand Down Expand Up @@ -93,22 +93,22 @@ public POutput buildIOWriter(PCollection<Row> input) {
.to(bqLocation));
}

private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) {
private static BeamTableStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) {
try {
BigInteger rowCount =
BigQueryHelpers.getNumRows(
o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation));

if (rowCount == null) {
return BeamRowCountStatistics.UNKNOWN;
return BeamTableStatistics.UNKNOWN;
}

return BeamRowCountStatistics.createBoundedTableStatistics(rowCount);
return BeamTableStatistics.createBoundedTableStatistics(rowCount.doubleValue());

} catch (IOException | InterruptedException e) {
LOGGER.warn("Could not get the row count for the table " + bqLocation, e);
}

return BeamRowCountStatistics.UNKNOWN;
return BeamTableStatistics.UNKNOWN;
}
}
Loading

0 comments on commit ff7a803

Please sign in to comment.