-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark Action to Analyze table #10288
base: main
Are you sure you want to change the base?
Conversation
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** Computes the statistic of the given columns and stores it as Puffin files. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AnalyzeTableSparkAction
is a generic name, I see that in future we want to compute the partition stats too. Which may not be written as puffin files.
Either we can change the change the naming to computeNDVSketches
or make it generic such that any kind of stats can be computed from this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more on this, I think we should just call it computeNDVSketches
and not mix it with partition stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to follow the model of RDMS and Engines like Trino using ANALYZE TABLE <tblName>
to collect all table level stats.
With a procedure per stats model, the user have to invoke procedure/action for every stats and
also with any new stats addition, the user need to ensure to update his code to call the new procedure/action.
not mix it with partition stats.
I think we could have partition stats as a separate action since it per partition, whereas this procedure can collect top level table stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karuppayya
I can see the tests in TestAnalyzeTableAction, it's working fine.
But have we tested in Spark, whether its working with a query like -
"Analyze table table1 compute statistics" ?
Because generally it gives the error
"[NOT_SUPPORTED_COMMAND_FOR_V2_TABLE] ANALYZE TABLE is not supported for v2 tables."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark doesnot have the grammar for Analyzing tables.
This PR introduces a Spark action. In subsequent PR, i plan to introduce a iceberg procedure to invoke the Spark action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll raise a PR for the spec update if there's no objections?
thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@karuppayya Thanks for the great work! Sorry I didn't have time to take a look at your PR earlier. For ANALYZE table, Spark has the following syntax:
ANALYZE TABLE table_identifier [ partition_spec ]
COMPUTE STATISTICS [ NOSCAN | FOR COLUMNS col [ , ... ] | FOR ALL COLUMNS ]
For column-level stats, Spark computes
- NDV
- Max (for numeric, Date, and Timestamp only)
- Min (for numeric, Date, and Timestamp only)
- Null Count
- Avg Len
- Max Len
We probably want to ensure that Iceberg implementation aligns with Spark's current functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @huaxingao do you mean, that this PR should take care of the other stats too apart from NDV, like MIN, MAX, NULL Count, etc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @karuppayya are we planning to induce the other column-level stats in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark(), table, columnsToBeAnalyzed.toArray(new String[0])); | ||
table | ||
.updateStatistics() | ||
.setStatistics(table.currentSnapshot().snapshotId(), statisticsFile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if table's current snapshot has modified concurrently by another client between like 117 to line 120?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good question. we should do #6442
|
||
public static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches( | ||
SparkSession spark, String tableName, String... columns) { | ||
String sql = String.format("select %s from %s", String.join(",", columns), tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also think about incremental update and update sketches from previous checkpoint. Querying whole table maybe not efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, incremental need to be wired into the ends of write paths.
This procedure could exist in parallel, which could get stats of the whole table on demand.
assumeTrue(catalogName.equals("spark_catalog")); | ||
sql( | ||
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES" | ||
+ "('format-version'='2')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default format version itself v2 now. So, specifying it again is redundant.
String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); | ||
OutputFile outputFile = fileIO.newOutputFile(path); | ||
try (PuffinWriter writer = | ||
Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this name instead of "analyze table procedure".
there was an old PR on the same: #6582 |
I don't have time to work on this, so karuppayya will take over. Thanks a lot @karuppayya for continuing the work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @karuppayya @huaxingao @szehon-ho this is aewsome to see! I left a review of the API/implementation, still have yet to review the tests which look to be a WIP
* @param statsToBeCollected set of statistics to be collected | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable stats(Set<String> statsToBeCollected); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these stats be a Set<StandardBlobType>
instead of arbitrary Strings? I feel like the API becomes more well defined in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, StandardBlobType defines string constants not enums
private void validateColumns() { | ||
validateEmptyColumns(); | ||
validateTypes(); | ||
} | ||
|
||
private void validateEmptyColumns() { | ||
if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.isEmpty()) { | ||
throw new ValidationException("No columns to analyze for the table", table.name()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think this validation should just happen at the time of setting these on the action rather than at the execcution time.
* @return this for method chaining | ||
*/ | ||
AnalyzeTable stats(Set<String> statsToBeCollected); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think this interface should have a snapshot
API to allow users to pass in a snapshot to generate the statistics for. If it's not specified then we can generate the statistics for the latest snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we support branch/tag as well? (I guess in subsequent pr)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the snapshot(String snapshotId)
has been added
for branch/tag -- is it existing pattern to support this first class in APIs, or require the caller to convert the information they have (branch/tag) into a snapshot ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, snapshot should be fine.
if (field == null) { | ||
throw new ValidationException("No column with %s name in the table", columnName); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: new line after the if
SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed) | ||
throws IOException { | ||
Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator = | ||
NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId, columnsToBeAnalyzed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does computeDVSketches need to be public? Seems like it can just be package private. Also nit, either way don't think you need the full qualified method name
import org.apache.datasketches.theta.Sketches; | ||
import org.apache.datasketches.theta.UpdateSketch; | ||
|
||
public class ThetaSketchJavaSerializable implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be public?
if (sketch == null) { | ||
return null; | ||
} | ||
if (sketch instanceof UpdateSketch) { | ||
return sketch.compact(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: new line after if
null, | ||
ImmutableMap.of())); | ||
} | ||
writer.finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Don't think you need the writer.finish()
because the try with resources will close, and close will already finish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you need to finish() to get the final fileSize() etc
table.currentSnapshot().snapshotId(), | ||
table.currentSnapshot().sequenceNumber(), | ||
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()), | ||
null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null means that the file will be uncompressed. I think it makes sense not to compress these files by default since the sketch will be a single long per column, so it'll be quite small already and not worth paying the price of compression/decompression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the sketch will be a single long per column
the sketch should be more than that, small number of kb iirc
Trino uses ZSTD for the blobs, and no compression for the footer.
if (sketch1.getSketch() == null && sketch2.getSketch() == null) { | ||
return emptySketchWrapped; | ||
} | ||
if (sketch1.getSketch() == null) { | ||
return sketch2; | ||
} | ||
if (sketch2.getSketch() == null) { | ||
return sketch1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: new line after if
5538f6e
to
de520fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @karuppayya thanks for the patch, I left a first round of comments.
* @param columns a set of column names to be analyzed | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable columns(Set<String> columns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, how about String... columns (see RewriteDataFiles). same for the others
* @param statsToBeCollected set of statistics to be collected | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable stats(Set<String> statsToBeCollected); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call statistics? Like StatisticsFile. https://iceberg.apache.org/contribute/#java-style-guidelines I think it can interpreted differently but I think point 3 implies we should make it have the full spelling if possible, and we dont have abbreviations for API methods in most of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also statsToBeCollected => types ?
AnalyzeTable columns(Set<String> columns); | ||
|
||
/** | ||
* A set of statistics to be collected on the given columns of the given table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The set of statistics to be collected? (given columns, given tables is specified elsewhere)
*/ | ||
AnalyzeTable snapshot(String snapshotId); | ||
|
||
/** The action result that contains a summary of the Analysis. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plural? contains summaries of the analysis
?
Also if capital, it can be a a javadoc link.
* @return this for method chaining | ||
*/ | ||
AnalyzeTable stats(Set<String> statsToBeCollected); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we support branch/tag as well? (I guess in subsequent pr)
(PairFlatMapFunction<Iterator<Row>, String, String>) | ||
input -> { | ||
final List<Tuple2<String, String>> list = Lists.newArrayList(); | ||
while (input.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use flatmap and mapToPair to make this more concise?
data.javaRDD().flatMap(r -> {
List<Tuple2<String, String>> list =
Lists.newArrayListWithExpectedSize(columns.size());
for (int i = 0; i < r.size(); i++) {
list.add(new Tuple2<>(columns.get(i), r.get(i).toString());
}
return list.iterator();
}).mapToPair(t -> t);
return ImmutableAnalyzeTable.Result.builder().analysisResults(analysisResults).build(); | ||
} | ||
|
||
private boolean analyzeableTypes(Set<String> columns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to intellij, there is a typo (analyzable)
final JavaPairRDD<String, ThetaSketchJavaSerializable> sketches = | ||
pairs.aggregateByKey( | ||
new ThetaSketchJavaSerializable(), | ||
1, // number of partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why limit to 1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was just copied from datasketches example.
This value is used in the HashPartitioner behind the scenes.
Should we set it to spark.sql.shuffle.partitions
?
return sketches.toLocalIterator(); | ||
} | ||
|
||
static class Add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use lambdas here for cleaner code? like
(sketch, val) -> {
sketch.update(val;);
return sketch;
},
The next one may be too complex to inline but maybe we can reduce the ugly java boilerplate
final Row row = input.next(); | ||
int size = row.size(); | ||
for (int i = 0; i < size; i++) { | ||
list.add(new Tuple2<>(columns.get(i), row.get(i).toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question, does forcing string type affect anything? I see the sketch library takes in other types.
de520fc
to
58d22d6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Started a second round
|
||
@Override | ||
public AnalyzeTable snapshot(String snapshotIdStr) { | ||
this.snapshotId = Long.parseLong(snapshotIdStr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we should just make this take long
|
||
StatisticsFile statisticsFile = | ||
NDVSketchGenerator.generateNDV( | ||
spark(), table, snapshotId, columns.toArray(new String[0])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do a similar thing with a columns() method as I suggested with snapshots, that checks if the user list is null/empty and sets it to all the table columns
private Set<String> columns() {
return (columns != null) && (columns.size() > 0) ?
table.schema().columns().stream()
.map(Types.NestedField::name)
.collect(Collectors.toSet()) :
columns;
}
Then the logic is centralized here if we have more stats, rather than in NDV class.
.type(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1) | ||
.build(); | ||
} catch (IOException ioe) { | ||
List<String> errors = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we only reporting error if we have IOException? (looks like from writing puffin file). It seems a bit strange just to catch this specific case, and not other exceptions.
It seems more natural to either catch all exceptions and report error, or else just throw all exceptions, what do you think?
private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class); | ||
|
||
private final Table table; | ||
private Set<String> columns = ImmutableSet.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we convert this from array to set and back a few times (its passed in as array, stored as set, and then passed as array to NDVSketchGenerator.generateNDV function). Can we just keep this as array the whole time (store this as array here)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrays are mutable, so if we decide to switch to arrays, please make sure to defensive-copy whenever passing to another class
return sketches.toLocalIterator(); | ||
} | ||
|
||
static class Combine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do this to get rid of ugly Function2 definition, and also make the main method a bit cleaner.
JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
pairs.aggregateByKey(
new ThetaSketchJavaSerializable(),
Integer.parseInt(
SQLConf.SHUFFLE_PARTITIONS().defaultValueString()), // number of partitions
NDVSketchGenerator::update,
NDVSketchGenerator::combine);
return sketches.toLocalIterator();
}
public static ThetaSketchJavaSerializable update(ThetaSketchJavaSerializable sketch, String val) {
sketch.update(val);
return sketch;
}
public static ThetaSketchJavaSerializable combine(
final ThetaSketchJavaSerializable sketch1, final ThetaSketchJavaSerializable sketch2) {
if (sketch1.getSketch() == null && sketch2.getSketch() == null) {
return emptySketchWrapped;
}
if (sketch1.getSketch() == null) {
return sketch2;
}
if (sketch2.getSketch() == null) {
return sketch1;
}
final CompactSketch compactSketch1 = sketch1.getCompactSketch();
final CompactSketch compactSketch2 = sketch2.getCompactSketch();
return new ThetaSketchJavaSerializable(
new SetOperationBuilder().buildUnion().union(compactSketch1, compactSketch2));
}
JavaPairRDD<String, ThetaSketchJavaSerializable> sketches = | ||
pairs.aggregateByKey( | ||
new ThetaSketchJavaSerializable(), | ||
Integer.parseInt( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we may be able to skip passing this , and rely on Spark defaults?
Can you do a bit of research to verify?
Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) { | ||
for (String columnName : columns) { | ||
writer.add( | ||
new Blob( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findepi @marton-bod would be good if you can take a look as well, to verify interop between Spark and Trino here?
Here it seems we are storing the serialized sketch, as is specified in the spec. Should we store ndv as well in 'metadata', as is specified in spec: https://github.com/apache/iceberg/blob/main/format/puffin-spec.md#apache-datasketches-theta-v1-blob-type (does this mean properties?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @szehon-ho for the ping
there are couple potential issues
- the blob needs to have
ndv
property - the sketch needs to be updated with the standard byte[] representation of values (
Conversions.toByteBuffer
) - there should be inter-op test Spark Action to Analyze table #10288 (comment)
- i am not exactly sure how what's the lifecycle of
ThetaSketchJavaSerializable
& whether this can impact the final results. need to re-read this portion
* @param types set of statistics to be collected | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable types(Set<String> types); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are allowed values for the types
parameter? How can someone interacting with the javadoc learn this?
is it "stats types", "blob types" or something else?
if "blob types", we could link to https://iceberg.apache.org/puffin-spec/#blob-types , but i don't think we can assume that all known blob types will be supported by the code at all times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the blob type in Action and stats type in the Procedure(from where we could map stats to its blob type(s)?)
For eg: if NDV supports 2 blob types and user wants to generate only one of those, that would still be possible from the actions.
* @return this for method chaining | ||
*/ | ||
AnalyzeTable stats(Set<String> statsToBeCollected); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the snapshot(String snapshotId)
has been added
for branch/tag -- is it existing pattern to support this first class in APIs, or require the caller to convert the information they have (branch/tag) into a snapshot ID?
@@ -26,4 +29,8 @@ private StandardBlobTypes() {} | |||
* href="https://datasketches.apache.org/">Apache DataSketches</a> library | |||
*/ | |||
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; | |||
|
|||
public static Set<String> blobTypes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it supposed to return "all standard blob types"?
should the name reflect that?
if we did #8202, would this new blob type be added to this method?
gradle/libs.versions.toml
Outdated
@@ -33,7 +33,7 @@ azuresdk-bom = "1.2.23" | |||
awssdk-s3accessgrants = "2.0.0" | |||
caffeine = "2.9.3" | |||
calcite = "1.10.0" | |||
datasketches = "6.0.0" | |||
datasketches="6.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please revert
private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class); | ||
|
||
private final Table table; | ||
private Set<String> columns = ImmutableSet.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrays are mutable, so if we decide to switch to arrays, please make sure to defensive-copy whenever passing to another class
table.currentSnapshot().snapshotId(), | ||
table.currentSnapshot().sequenceNumber(), | ||
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()), | ||
null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the sketch will be a single long per column
the sketch should be more than that, small number of kb iirc
Trino uses ZSTD for the blobs, and no compression for the footer.
null, | ||
ImmutableMap.of())); | ||
} | ||
writer.finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you need to finish() to get the final fileSize() etc
ImmutableList.of(table.schema().findField(columnName).fieldId()), | ||
table.currentSnapshot().snapshotId(), | ||
table.currentSnapshot().sequenceNumber(), | ||
ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think there should be compact()
(perhaps not here, but inside computeNDVSketches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW it would be good to have a cross-engine compatibility test to ensure the value we write here can indeed be used correctly by other engines. for trino, you can use https://java.testcontainers.org/modules/databases/trino/
Trino already has such tests, but that doesn't cover Iceberg Spark features that are being implemented.
} | ||
final byte[] serializedSketchBytes = new byte[length]; | ||
in.readFully(serializedSketchBytes); | ||
sketch = Sketches.wrapSketch(Memory.wrap(serializedSketchBytes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we wrote a compact sketch, so we can use CompactSketch.wrap
here
List<Tuple2<String, String>> columnsList = | ||
Lists.newArrayListWithExpectedSize(columns.size()); | ||
for (int i = 0; i < row.size(); i++) { | ||
columnsList.add(new Tuple2<>(columns.get(i), row.get(i).toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't use toString
this should use Conversions.toByteBuffer
see https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type
c189b28
to
7b2cbce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a lot changed, skimming for now. will have to re-read this
@@ -26,4 +29,8 @@ private StandardBlobTypes() {} | |||
* href="https://datasketches.apache.org/">Apache DataSketches</a> library | |||
*/ | |||
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; | |||
|
|||
public static Set<String> allStandardBlobTypes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
private final Set<String> supportedBlobTypes = | ||
ImmutableSet.of(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be static?
private Set<String> columns; | ||
private Set<String> blobTypesToAnalyze = supportedBlobTypes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about combining these into a list of (blob type, columns) pairs?
This might be necessary when we add support for new blob types.
see https://github.com/apache/iceberg/pull/10288/files#r1639547902
private Long snapshotId; | ||
|
||
AnalyzeTableSparkAction(SparkSession spark, Table table) { | ||
super(spark); | ||
this.table = table; | ||
Snapshot snapshot = table.currentSnapshot(); | ||
ValidationException.check(snapshot != null, "Cannot analyze a table that has no snapshots"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to handle this case gracefully.
Table without snapshots is an empty table (no data).
Also, stats are assigned to snapshots, and there is no snapshot, so there cannot be a stats file created.
Thus there is only one way to handle this gracefully -- just no-op.
I believe this would be better from user-perspective than just throwing.
.type(statsName) | ||
.addAllErrors(Lists.newArrayList("Stats type not supported")) | ||
.build(); | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case this exception is thrown (due to some code modifications in the future), it could be helpful to include type
in the exception message.
public AnalyzeTable snapshot(long snapId) { | ||
this.snapshotId = snapId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: snapId -> snapshotId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
Sets.newHashSet(StandardBlobTypes.blobTypes()).containsAll(statisticTypes), | ||
"type not supported"); | ||
this.types = statisticTypes; | ||
public AnalyzeTable blobTypes(Set<String> types) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: types -> blobTypes (just like in the interface method declaration)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
7b2cbce
to
deee51c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @findepi for your review.
I addressed the comments, please take a look when you get a chance.
One pending item is the interoperability test.
* @param columnNames a set of column names to be analyzed | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable columns(String... columnNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see few databases that allow collecting all the stats for given columns.
PostgreSQL
ANALYZE table_name (column1, column2);
Oracle
EXEC DBMS_STATS.GATHER_TABLE_STATS('schema_name', 'table_name', 'method_opt' => 'FOR COLUMNS column1, column2');
Also looks like most databases dont allow specifying types of stats to be collected.
Though we take type as input in the API, we can restrict the usage by not exposing it in the procedure?
correlation blob type for {B,C} together
This is the stats on the combined value of B and C. Is my understanding right?
Is this a common usecase since i didnt find databases supporting this by default
* @param types set of statistics to be collected | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable types(Set<String> types); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the blob type in Action and stats type in the Procedure(from where we could map stats to its blob type(s)?)
For eg: if NDV supports 2 blob types and user wants to generate only one of those, that would still be possible from the actions.
@@ -26,4 +29,8 @@ private StandardBlobTypes() {} | |||
* href="https://datasketches.apache.org/">Apache DataSketches</a> library | |||
*/ | |||
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; | |||
|
|||
public static Set<String> allStandardBlobTypes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
Sets.newHashSet(StandardBlobTypes.blobTypes()).containsAll(statisticTypes), | ||
"type not supported"); | ||
this.types = statisticTypes; | ||
public AnalyzeTable blobTypes(Set<String> types) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
public AnalyzeTable snapshot(long snapId) { | ||
this.snapshotId = snapId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
return type == Type.TypeID.INTEGER | ||
|| type == Type.TypeID.LONG | ||
|| type == Type.TypeID.STRING | ||
|| type == Type.TypeID.DOUBLE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These were the datatypes supported by the sketch lib. I think this no longer relevant with Conversions.toByteBuffer
AnalyzeTable blobTypes(Set<String> blobTypes); | ||
|
||
/** | ||
* id of the snapshot for which stats need to be collected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: capitalize id (at least first character, or even ID works)
* @return this for method chaining | ||
*/ | ||
AnalyzeTable stats(Set<String> statsToBeCollected); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, snapshot should be fine.
* @param columnNames a set of column names to be analyzed | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable columns(String... columnNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findepi Is this resolved? Is there a strong use case to support specification of (type, column) pair?
How about: columnStats(Set types, String ... columns).
If types is not specified, then we can set it to all the supported types.
try { | ||
return generateNDVBlobs().stream(); | ||
} catch (Exception e) { | ||
LOG.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question, it seems simpler to throw an exception, what is the motivation for this error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there are can be more than one type of statistic to be collected, I was think about sending the errors with respect to each stat type in the results.
But looks like this is not something very common atleast in RDBMS land.
I changed the Result to not send error message and instead throwing an Exception now
throw new UnsupportedOperationException( | ||
String.format("%s is not supported", type)); | ||
} | ||
return Stream.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be moved up to the catch code, as it doesnt make too much sense here.
} | ||
|
||
private static ThetaSketchJavaSerializable combine( | ||
final ThetaSketchJavaSerializable sketch1, final ThetaSketchJavaSerializable sketch2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove the finals here
return sketch1; | ||
} | ||
|
||
final CompactSketch compactSketch1 = sketch1.getCompactSketch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove final
return sketch; | ||
} | ||
|
||
private static ThetaSketchJavaSerializable combine( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realize, why cant we move this logic to ThetaSketchJavaSerializable itself?
Then update/combine can just be oneliners, and probably able to be inlined ie :
colNameAndSketchPair.aggregateByKey(
new ThetaSketchJavaSerializable(),
ThetaSketchJavaSerializable::update,
ThetaSketchJavaSerializable::combine,
deee51c
to
43743d7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks a lot better , some more comments.
* @param columnNames a set of column names to be analyzed | ||
* @return this for method chaining | ||
*/ | ||
AnalyzeTable columns(String... columnNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: columnNames => columns for simplicity, there is also precedent in RewriteDataFiles for example
super(spark); | ||
this.table = table; | ||
Snapshot snapshot = table.currentSnapshot(); | ||
if (snapshot != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what you think, would it be simpler to do the error check here? (instead of doExecute)
Preconditions.checkNotNull(snapshot, "Cannot analyze an empty table")
Then this.snapshotToAnalyze can just be a primitive long (no need to worry about nulls?) The argument for the setter is already "long" type.
this.table = table; | ||
Snapshot snapshot = table.currentSnapshot(); | ||
if (snapshot != null) { | ||
snapshotToAnalyze = snapshot.snapshotId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we typically use this when setting member variables, ie this.snapshotToAnalyze
|
||
private final Table table; | ||
private Set<String> columns; | ||
private Long snapshotToAnalyze; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: snapshotToAnalyze => snapshotId? (I feel 'toAnalyze' is apparent, also as its not actually Snapshot object). If its to not hide a variable, maybe we can change the other one as this one is used in more places and it would save more unnecessary chars.
if (snapshot != null) { | ||
snapshotToAnalyze = snapshot.snapshotId(); | ||
} | ||
columns = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, this.columns = ...
Schema schema = table.schema(); | ||
List<Types.NestedField> nestedFields = | ||
columns.stream().map(schema::findField).collect(Collectors.toList()); | ||
final JavaPairRDD<String, ByteBuffer> colNameAndSketchPairRDD = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit; remove final
return (CompactSketch) sketch; | ||
} | ||
|
||
void update(final ByteBuffer value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove final?
return sketch.getEstimate(); | ||
} | ||
|
||
private void writeObject(final ObjectOutputStream out) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same remove final (and next method too)
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchGenerator.java
Show resolved
Hide resolved
.collect(Collectors.toList()); | ||
} | ||
|
||
static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- any reason this method is not private?
- how about this method return the Map directly to let the main method be cleaner?
43743d7
to
fa2f4b9
Compare
private Result doExecute() { | ||
LOG.info("Starting analysis of {} for snapshot {}", table.name(), snapshotId); | ||
List<Blob> blobs = | ||
supportedBlobTypes.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks a bit silly , as we define the supportedTypes (as we decided not configurable in the beginning) and are just checking whether it is APACHE_DATASKETCHES_THETA_V1, should we just simplify it?
return columns.stream() | ||
.map( | ||
columnName -> { | ||
Sketch sketch = sketchMap.get(columnName).getSketch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually i just realize this, why do we need the previous method to pass in Map of column name? It seems we always use column id, we can simplify the collect spark job if so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to generate a map between ids and column names.
As for the previous method passing columns name,
we need it since we need to compose the dataframe to select columns based on columns names
Dataset<Row> data =
spark
.read()
.option(SparkReadOptions.SNAPSHOT_ID, snapshotId)
.table(tableName)
.select(columns.stream().map(functions::col).toArray(Column[]::new));
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestAnalyzeTableAction.java
Show resolved
Hide resolved
Table table = Spark3Util.loadIcebergTable(spark, tableName); | ||
SparkActions actions = SparkActions.get(); | ||
AnalyzeTable.Result results = actions.analyzeTable(table).columns("id", "data").execute(); | ||
actions.analyzeTable(table).columns("id", "data").execute(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
option: should we switch this to just one column of the table? (as the other one already calls with two columns, albeit implicitly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second invocaton is not needed, added by mistake.
Removed now
|
||
Assertions.assertEquals(1, table.statisticsFiles().size()); | ||
Assertions.assertEquals(2, table.statisticsFiles().get(0).blobMetadata().size()); | ||
assertNotEquals(0, table.statisticsFiles().get(0).fileSizeInBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: doesnt hurt to also assert the ndv is collected here?
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestAnalyzeTableAction.java
Show resolved
Hide resolved
fa2f4b9
to
e4ff70c
Compare
Snapshot snapshot = table.currentSnapshot(); | ||
if (snapshot == null) { | ||
LOG.error("Unable to analyze the table since the table has no snapshots"); | ||
throw new RuntimeException("Snapshot id is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: update exception message to more reflect error? (like the log message?)
|
||
static ThetaSketchJavaSerializable combineSketch( | ||
ThetaSketchJavaSerializable sketch1, ThetaSketchJavaSerializable sketch2) { | ||
ThetaSketchJavaSerializable emptySketchWrapped = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we just inline this in the return value? It doesnt seem to be used elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry! final few comments on javadoc and small consistency nit
/** An action that collects statistics of an Iceberg table and writes to Puffin files. */ | ||
public interface AnalyzeTable extends Action<AnalyzeTable, AnalyzeTable.Result> { | ||
/** | ||
* The set of columns to be analyzed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Choose the set of columns to be analyzed, by default all columns are analyzed.
AnalyzeTable columns(String... columns); | ||
|
||
/** | ||
* Id of the snapshot for which stats need to be collected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Choose the table snapshot to analyze, by default the current snapshot is analyzed.
/** | ||
* Id of the snapshot for which stats need to be collected | ||
* | ||
* @param snapshotId long id of the snapshot for which stats need to be collected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'to be collected' => 'analyzed' to be consistent with previous javadoc?
*/ | ||
AnalyzeTable snapshot(long snapshotId); | ||
|
||
/** The action result that contains summaries of the Analysis. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Analysis can be lower case, as its not a class object.
} | ||
|
||
private static Map<Integer, ThetaSketchJavaSerializable> computeNDVSketches( | ||
SparkSession spark, Table table, long snapshotId, Set<String> toBeAnalyzedColumns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: columnsToBeAnalyzed to be consistent with above method
RuntimeException exception = | ||
assertThrows( | ||
RuntimeException.class, () -> actions.analyzeTable(table).columns("id").execute()); | ||
assertTrue(exception.getMessage().contains("Snapshot id is null")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need to change this message?
I'll have some time to take a look this week. |
@@ -70,4 +70,10 @@ default RewritePositionDeleteFiles rewritePositionDeletes(Table table) { | |||
throw new UnsupportedOperationException( | |||
this.getClass().getName() + " does not implement rewritePositionDeletes"); | |||
} | |||
|
|||
/** Instantiates an action to analyze tables */ | |||
default AnalyzeTable analyzeTable(Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Have we considered other names like computeTableStats
or refreshTableStats
to be a bit more specific? What naming is used in other engines? I'd be curious to hear from everyone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I have a same concern about this generic name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand there is ANALYZE
command in Spark but I wonder how to handle partition stats in the future (e.g. like whether they should be computed as part of analyze or separately).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe by default collect all the stats and have option to compute specific stats like distinct count and partition stats by specifying individually.
@@ -70,4 +70,10 @@ default RewritePositionDeleteFiles rewritePositionDeletes(Table table) { | |||
throw new UnsupportedOperationException( | |||
this.getClass().getName() + " does not implement rewritePositionDeletes"); | |||
} | |||
|
|||
/** Instantiates an action to analyze tables */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Missing .
at the end?
interface Result { | ||
|
||
/** Returns statistics file. */ | ||
StatisticsFile statisticFile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are missing s
here: statisticFile()
-> statisticsFile()
.
AnalyzeTable columns(String... columns); | ||
|
||
/** | ||
* Id of the snapshot for which stats need to be collected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: id
-> ID
everywhere.
@@ -59,6 +59,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { | |||
implementation project(':iceberg-parquet') | |||
implementation project(':iceberg-arrow') | |||
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}") | |||
implementation("org.apache.datasketches:datasketches-java:${libs.versions.datasketches.get()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Spark by any chance ship this? Do we have to worry about conflicts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. Looks like sql/catalyst uses the same library.
Should we shade this here and pin the version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have shaded the library
super(spark); | ||
this.table = table; | ||
Snapshot snapshot = table.currentSnapshot(); | ||
if (snapshot == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, shouldn't we simply gracefully return in this case? Why throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense,
This action current returns the statistic file as output.
In this case of a table with no snapshots, should we return null for results?
7525f36
to
cc90599
Compare
cc90599
to
9013790
Compare
This change adds a Spark action to Analyze tables.
As part of analysis, the action generates Apache data - sketch for NDV stats and writes it as puffins.