Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Action to Analyze table #10288

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

karuppayya
Copy link
Contributor

This change adds a Spark action to Analyze tables.
As part of analysis, the action generates Apache data - sketch for NDV stats and writes it as puffins.

@karuppayya
Copy link
Contributor Author

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Computes the statistic of the given columns and stores it as Puffin files. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalyzeTableSparkAction is a generic name, I see that in future we want to compute the partition stats too. Which may not be written as puffin files.

Either we can change the change the naming to computeNDVSketches or make it generic such that any kind of stats can be computed from this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more on this, I think we should just call it computeNDVSketches and not mix it with partition stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to follow the model of RDMS and Engines like Trino using ANALYZE TABLE <tblName> to collect all table level stats.
With a procedure per stats model, the user have to invoke procedure/action for every stats and
also with any new stats addition, the user need to ensure to update his code to call the new procedure/action.

not mix it with partition stats.

I think we could have partition stats as a separate action since it per partition, whereas this procedure can collect top level table stats.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya
I can see the tests in TestAnalyzeTableAction, it's working fine.
But have we tested in Spark, whether its working with a query like -
"Analyze table table1 compute statistics" ?

Because generally it gives the error
"[NOT_SUPPORTED_COMMAND_FOR_V2_TABLE] ANALYZE TABLE is not supported for v2 tables."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark doesnot have the grammar for Analyzing tables.
This PR introduces a Spark action. In subsequent PR, i plan to introduce a iceberg procedure to invoke the Spark action.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll raise a PR for the spec update if there's no objections?

thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karuppayya Thanks for the great work! Sorry I didn't have time to take a look at your PR earlier. For ANALYZE table, Spark has the following syntax:

ANALYZE TABLE table_identifier [ partition_spec ]
    COMPUTE STATISTICS [ NOSCAN | FOR COLUMNS col [ , ... ] | FOR ALL COLUMNS ]

For column-level stats, Spark computes

  • NDV
  • Max (for numeric, Date, and Timestamp only)
  • Min (for numeric, Date, and Timestamp only)
  • Null Count
  • Avg Len
  • Max Len

We probably want to ensure that Iceberg implementation aligns with Spark's current functionality.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @huaxingao do you mean, that this PR should take care of the other stats too apart from NDV, like MIN, MAX, NULL Count, etc ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @karuppayya are we planning to induce the other column-level stats in this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeesou , This PR is only for the NDV stats. (PR to propagate the stats in scan)

spark(), table, columnsToBeAnalyzed.toArray(new String[0]));
table
.updateStatistics()
.setStatistics(table.currentSnapshot().snapshotId(), statisticsFile)
Copy link
Member

@ajantha-bhat ajantha-bhat May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if table's current snapshot has modified concurrently by another client between like 117 to line 120?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good question. we should do #6442


public static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches(
SparkSession spark, String tableName, String... columns) {
String sql = String.format("select %s from %s", String.join(",", columns), tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also think about incremental update and update sketches from previous checkpoint. Querying whole table maybe not efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, incremental need to be wired into the ends of write paths.
This procedure could exist in parallel, which could get stats of the whole table on demand.

assumeTrue(catalogName.equals("spark_catalog"));
sql(
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='2')",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default format version itself v2 now. So, specifying it again is redundant.

String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID()));
OutputFile outputFile = fileIO.newOutputFile(path);
try (PuffinWriter writer =
Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this name instead of "analyze table procedure".

@ajantha-bhat
Copy link
Member

there was an old PR on the same: #6582

@huaxingao
Copy link
Contributor

there was an old PR on the same: #6582

I don't have time to work on this, so karuppayya will take over. Thanks a lot @karuppayya for continuing the work.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @karuppayya @huaxingao @szehon-ho this is aewsome to see! I left a review of the API/implementation, still have yet to review the tests which look to be a WIP

* @param statsToBeCollected set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these stats be a Set<StandardBlobType> instead of arbitrary Strings? I feel like the API becomes more well defined in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, StandardBlobType defines string constants not enums

Comment on lines 89 to 117
private void validateColumns() {
validateEmptyColumns();
validateTypes();
}

private void validateEmptyColumns() {
if (columnsToBeAnalyzed == null || columnsToBeAnalyzed.isEmpty()) {
throw new ValidationException("No columns to analyze for the table", table.name());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think this validation should just happen at the time of setting these on the action rather than at the execcution time.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this interface should have a snapshot API to allow users to pass in a snapshot to generate the statistics for. If it's not specified then we can generate the statistics for the latest snapshot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support branch/tag as well? (I guess in subsequent pr)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the snapshot(String snapshotId) has been added

for branch/tag -- is it existing pattern to support this first class in APIs, or require the caller to convert the information they have (branch/tag) into a snapshot ID?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, snapshot should be fine.

Comment on lines 104 to 106
if (field == null) {
throw new ValidationException("No column with %s name in the table", columnName);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after the if

SparkSession spark, Table table, long snapshotId, String... columnsToBeAnalyzed)
throws IOException {
Iterator<Tuple2<String, ThetaSketchJavaSerializable>> tuple2Iterator =
NDVSketchGenerator.computeNDVSketches(spark, table.name(), snapshotId, columnsToBeAnalyzed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does computeDVSketches need to be public? Seems like it can just be package private. Also nit, either way don't think you need the full qualified method name

import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;

public class ThetaSketchJavaSerializable implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Comment on lines 46 to 53
if (sketch == null) {
return null;
}
if (sketch instanceof UpdateSketch) {
return sketch.compact();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

null,
ImmutableMap.of()));
}
writer.finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Don't think you need the writer.finish() because the try with resources will close, and close will already finish

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need to finish() to get the final fileSize() etc

table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null means that the file will be uncompressed. I think it makes sense not to compress these files by default since the sketch will be a single long per column, so it'll be quite small already and not worth paying the price of compression/decompression.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the sketch will be a single long per column

the sketch should be more than that, small number of kb iirc
Trino uses ZSTD for the blobs, and no compression for the footer.

Comment on lines 157 to 168
if (sketch1.getSketch() == null && sketch2.getSketch() == null) {
return emptySketchWrapped;
}
if (sketch1.getSketch() == null) {
return sketch2;
}
if (sketch2.getSketch() == null) {
return sketch1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: new line after if

@karuppayya karuppayya force-pushed the analyze_action branch 3 times, most recently from 5538f6e to de520fc Compare June 4, 2024 17:55
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @karuppayya thanks for the patch, I left a first round of comments.

* @param columns a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(Set<String> columns);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, how about String... columns (see RewriteDataFiles). same for the others

* @param statsToBeCollected set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call statistics? Like StatisticsFile. https://iceberg.apache.org/contribute/#java-style-guidelines I think it can interpreted differently but I think point 3 implies we should make it have the full spelling if possible, and we dont have abbreviations for API methods in most of code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also statsToBeCollected => types ?

AnalyzeTable columns(Set<String> columns);

/**
* A set of statistics to be collected on the given columns of the given table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The set of statistics to be collected? (given columns, given tables is specified elsewhere)

*/
AnalyzeTable snapshot(String snapshotId);

/** The action result that contains a summary of the Analysis. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plural? contains summaries of the analysis?

Also if capital, it can be a a javadoc link.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support branch/tag as well? (I guess in subsequent pr)

(PairFlatMapFunction<Iterator<Row>, String, String>)
input -> {
final List<Tuple2<String, String>> list = Lists.newArrayList();
while (input.hasNext()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use flatmap and mapToPair to make this more concise?

data.javaRDD().flatMap(r -> {
          List<Tuple2<String, String>> list =
            Lists.newArrayListWithExpectedSize(columns.size());
          for (int i = 0; i < r.size(); i++) {
            list.add(new Tuple2<>(columns.get(i), r.get(i).toString());
          }
          return list.iterator();
          }).mapToPair(t -> t);

return ImmutableAnalyzeTable.Result.builder().analysisResults(analysisResults).build();
}

private boolean analyzeableTypes(Set<String> columns) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to intellij, there is a typo (analyzable)

final JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
pairs.aggregateByKey(
new ThetaSketchJavaSerializable(),
1, // number of partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why limit to 1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was just copied from datasketches example.
This value is used in the HashPartitioner behind the scenes.
Should we set it to spark.sql.shuffle.partitions?

return sketches.toLocalIterator();
}

static class Add
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use lambdas here for cleaner code? like

 (sketch, val) -> {
              sketch.update(val;);
              return sketch;
          },

The next one may be too complex to inline but maybe we can reduce the ugly java boilerplate

final Row row = input.next();
int size = row.size();
for (int i = 0; i < size; i++) {
list.add(new Tuple2<>(columns.get(i), row.get(i).toString()));
Copy link
Collaborator

@szehon-ho szehon-ho Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, does forcing string type affect anything? I see the sketch library takes in other types.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Started a second round


@Override
public AnalyzeTable snapshot(String snapshotIdStr) {
this.snapshotId = Long.parseLong(snapshotIdStr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should just make this take long


StatisticsFile statisticsFile =
NDVSketchGenerator.generateNDV(
spark(), table, snapshotId, columns.toArray(new String[0]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a similar thing with a columns() method as I suggested with snapshots, that checks if the user list is null/empty and sets it to all the table columns

  private Set<String> columns() {
    return (columns != null) && (columns.size() > 0) ?
      table.schema().columns().stream()
        .map(Types.NestedField::name)
        .collect(Collectors.toSet()) : 
      columns;
  }

Then the logic is centralized here if we have more stats, rather than in NDV class.

.type(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1)
.build();
} catch (IOException ioe) {
List<String> errors = Lists.newArrayList();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we only reporting error if we have IOException? (looks like from writing puffin file). It seems a bit strange just to catch this specific case, and not other exceptions.

It seems more natural to either catch all exceptions and report error, or else just throw all exceptions, what do you think?

private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class);

private final Table table;
private Set<String> columns = ImmutableSet.of();
Copy link
Collaborator

@szehon-ho szehon-ho Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we convert this from array to set and back a few times (its passed in as array, stored as set, and then passed as array to NDVSketchGenerator.generateNDV function). Can we just keep this as array the whole time (store this as array here)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrays are mutable, so if we decide to switch to arrays, please make sure to defensive-copy whenever passing to another class

return sketches.toLocalIterator();
}

static class Combine
Copy link
Collaborator

@szehon-ho szehon-ho Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this to get rid of ugly Function2 definition, and also make the main method a bit cleaner.

    JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
        pairs.aggregateByKey(
            new ThetaSketchJavaSerializable(),
            Integer.parseInt(
                SQLConf.SHUFFLE_PARTITIONS().defaultValueString()), // number of partitions
            NDVSketchGenerator::update,
            NDVSketchGenerator::combine);

    return sketches.toLocalIterator();
  }

  public static ThetaSketchJavaSerializable update(ThetaSketchJavaSerializable sketch, String val) {
    sketch.update(val);
    return sketch;
  }

  public static ThetaSketchJavaSerializable combine(
        final ThetaSketchJavaSerializable sketch1, final ThetaSketchJavaSerializable sketch2) {
      if (sketch1.getSketch() == null && sketch2.getSketch() == null) {
        return emptySketchWrapped;
      }
      if (sketch1.getSketch() == null) {
        return sketch2;
      }
      if (sketch2.getSketch() == null) {
        return sketch1;
      }

      final CompactSketch compactSketch1 = sketch1.getCompactSketch();
      final CompactSketch compactSketch2 = sketch2.getCompactSketch();
      return new ThetaSketchJavaSerializable(
          new SetOperationBuilder().buildUnion().union(compactSketch1, compactSketch2));
    }

JavaPairRDD<String, ThetaSketchJavaSerializable> sketches =
pairs.aggregateByKey(
new ThetaSketchJavaSerializable(),
Integer.parseInt(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we may be able to skip passing this , and rely on Spark defaults?

Can you do a bit of research to verify?

Puffin.write(outputFile).createdBy("Iceberg Analyze action").build()) {
for (String columnName : columns) {
writer.add(
new Blob(
Copy link
Collaborator

@szehon-ho szehon-ho Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi @marton-bod would be good if you can take a look as well, to verify interop between Spark and Trino here?

Here it seems we are storing the serialized sketch, as is specified in the spec. Should we store ndv as well in 'metadata', as is specified in spec: https://github.com/apache/iceberg/blob/main/format/puffin-spec.md#apache-datasketches-theta-v1-blob-type (does this mean properties?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @szehon-ho for the ping
there are couple potential issues

  • the blob needs to have ndv property
  • the sketch needs to be updated with the standard byte[] representation of values (Conversions.toByteBuffer)
  • there should be inter-op test Spark Action to Analyze table #10288 (comment)
  • i am not exactly sure how what's the lifecycle of ThetaSketchJavaSerializable & whether this can impact the final results. need to re-read this portion

* @param types set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable types(Set<String> types);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are allowed values for the types parameter? How can someone interacting with the javadoc learn this?
is it "stats types", "blob types" or something else?
if "blob types", we could link to https://iceberg.apache.org/puffin-spec/#blob-types , but i don't think we can assume that all known blob types will be supported by the code at all times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the blob type in Action and stats type in the Procedure(from where we could map stats to its blob type(s)?)
For eg: if NDV supports 2 blob types and user wants to generate only one of those, that would still be possible from the actions.

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the snapshot(String snapshotId) has been added

for branch/tag -- is it existing pattern to support this first class in APIs, or require the caller to convert the information they have (branch/tag) into a snapshot ID?

@@ -26,4 +29,8 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static Set<String> blobTypes() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it supposed to return "all standard blob types"?
should the name reflect that?

if we did #8202, would this new blob type be added to this method?

@@ -33,7 +33,7 @@ azuresdk-bom = "1.2.23"
awssdk-s3accessgrants = "2.0.0"
caffeine = "2.9.3"
calcite = "1.10.0"
datasketches = "6.0.0"
datasketches="6.0.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert

private static final Logger LOG = LoggerFactory.getLogger(AnalyzeTableSparkAction.class);

private final Table table;
private Set<String> columns = ImmutableSet.of();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrays are mutable, so if we decide to switch to arrays, please make sure to defensive-copy whenever passing to another class

table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columns.get(i)).getSketch().toByteArray()),
null,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the sketch will be a single long per column

the sketch should be more than that, small number of kb iirc
Trino uses ZSTD for the blobs, and no compression for the footer.

null,
ImmutableMap.of()));
}
writer.finish();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need to finish() to get the final fileSize() etc

ImmutableList.of(table.schema().findField(columnName).fieldId()),
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(sketchMap.get(columnName).getSketch().toByteArray()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there should be compact() (perhaps not here, but inside computeNDVSketches

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW it would be good to have a cross-engine compatibility test to ensure the value we write here can indeed be used correctly by other engines. for trino, you can use https://java.testcontainers.org/modules/databases/trino/

Trino already has such tests, but that doesn't cover Iceberg Spark features that are being implemented.

}
final byte[] serializedSketchBytes = new byte[length];
in.readFully(serializedSketchBytes);
sketch = Sketches.wrapSketch(Memory.wrap(serializedSketchBytes));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wrote a compact sketch, so we can use CompactSketch.wrap here

List<Tuple2<String, String>> columnsList =
Lists.newArrayListWithExpectedSize(columns.size());
for (int i = 0; i < row.size(); i++) {
columnsList.add(new Tuple2<>(columns.get(i), row.get(i).toString()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't use toString
this should use Conversions.toByteBuffer
see https://iceberg.apache.org/puffin-spec/#apache-datasketches-theta-v1-blob-type

@karuppayya karuppayya force-pushed the analyze_action branch 2 times, most recently from c189b28 to 7b2cbce Compare June 17, 2024 23:41
Copy link
Collaborator

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot changed, skimming for now. will have to re-read this

@@ -26,4 +29,8 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static Set<String> allStandardBlobTypes() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines 58 to 59
private final Set<String> supportedBlobTypes =
ImmutableSet.of(StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be static?

Comment on lines 60 to 61
private Set<String> columns;
private Set<String> blobTypesToAnalyze = supportedBlobTypes;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about combining these into a list of (blob type, columns) pairs?
This might be necessary when we add support for new blob types.
see https://github.com/apache/iceberg/pull/10288/files#r1639547902

private Long snapshotId;

AnalyzeTableSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
Snapshot snapshot = table.currentSnapshot();
ValidationException.check(snapshot != null, "Cannot analyze a table that has no snapshots");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to handle this case gracefully.
Table without snapshots is an empty table (no data).
Also, stats are assigned to snapshots, and there is no snapshot, so there cannot be a stats file created.
Thus there is only one way to handle this gracefully -- just no-op.
I believe this would be better from user-perspective than just throwing.

.type(statsName)
.addAllErrors(Lists.newArrayList("Stats type not supported"))
.build();
throw new UnsupportedOperationException();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this exception is thrown (due to some code modifications in the future), it could be helpful to include type in the exception message.

Comment on lines 171 to 172
public AnalyzeTable snapshot(long snapId) {
this.snapshotId = snapId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: snapId -> snapshotId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

Sets.newHashSet(StandardBlobTypes.blobTypes()).containsAll(statisticTypes),
"type not supported");
this.types = statisticTypes;
public AnalyzeTable blobTypes(Set<String> types) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: types -> blobTypes (just like in the interface method declaration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

Copy link
Contributor Author

@karuppayya karuppayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @findepi for your review.
I addressed the comments, please take a look when you get a chance.
One pending item is the interoperability test.

* @param columnNames a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(String... columnNames);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see few databases that allow collecting all the stats for given columns.
PostgreSQL
ANALYZE table_name (column1, column2);

Oracle
EXEC DBMS_STATS.GATHER_TABLE_STATS('schema_name', 'table_name', 'method_opt' => 'FOR COLUMNS column1, column2');

Also looks like most databases dont allow specifying types of stats to be collected.
Though we take type as input in the API, we can restrict the usage by not exposing it in the procedure?

correlation blob type for {B,C} together

This is the stats on the combined value of B and C. Is my understanding right?
Is this a common usecase since i didnt find databases supporting this by default

* @param types set of statistics to be collected
* @return this for method chaining
*/
AnalyzeTable types(Set<String> types);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the blob type in Action and stats type in the Procedure(from where we could map stats to its blob type(s)?)
For eg: if NDV supports 2 blob types and user wants to generate only one of those, that would still be possible from the actions.

@@ -26,4 +29,8 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static Set<String> allStandardBlobTypes() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Sets.newHashSet(StandardBlobTypes.blobTypes()).containsAll(statisticTypes),
"type not supported");
this.types = statisticTypes;
public AnalyzeTable blobTypes(Set<String> types) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

Comment on lines 171 to 172
public AnalyzeTable snapshot(long snapId) {
this.snapshotId = snapId;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

Comment on lines 103 to 106
return type == Type.TypeID.INTEGER
|| type == Type.TypeID.LONG
|| type == Type.TypeID.STRING
|| type == Type.TypeID.DOUBLE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were the datatypes supported by the sketch lib. I think this no longer relevant with Conversions.toByteBuffer

AnalyzeTable blobTypes(Set<String> blobTypes);

/**
* id of the snapshot for which stats need to be collected
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: capitalize id (at least first character, or even ID works)

* @return this for method chaining
*/
AnalyzeTable stats(Set<String> statsToBeCollected);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, snapshot should be fine.

* @param columnNames a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(String... columnNames);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi Is this resolved? Is there a strong use case to support specification of (type, column) pair?

How about: columnStats(Set types, String ... columns).

If types is not specified, then we can set it to all the supported types.

try {
return generateNDVBlobs().stream();
} catch (Exception e) {
LOG.error(
Copy link
Collaborator

@szehon-ho szehon-ho Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, it seems simpler to throw an exception, what is the motivation for this error handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are can be more than one type of statistic to be collected, I was think about sending the errors with respect to each stat type in the results.
But looks like this is not something very common atleast in RDBMS land.
I changed the Result to not send error message and instead throwing an Exception now

throw new UnsupportedOperationException(
String.format("%s is not supported", type));
}
return Stream.empty();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved up to the catch code, as it doesnt make too much sense here.

}

private static ThetaSketchJavaSerializable combine(
final ThetaSketchJavaSerializable sketch1, final ThetaSketchJavaSerializable sketch2) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove the finals here

return sketch1;
}

final CompactSketch compactSketch1 = sketch1.getCompactSketch();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove final

return sketch;
}

private static ThetaSketchJavaSerializable combine(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize, why cant we move this logic to ThetaSketchJavaSerializable itself?

Then update/combine can just be oneliners, and probably able to be inlined ie :

        colNameAndSketchPair.aggregateByKey(
            new ThetaSketchJavaSerializable(),
            ThetaSketchJavaSerializable::update,
            ThetaSketchJavaSerializable::combine,

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks a lot better , some more comments.

* @param columnNames a set of column names to be analyzed
* @return this for method chaining
*/
AnalyzeTable columns(String... columnNames);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: columnNames => columns for simplicity, there is also precedent in RewriteDataFiles for example

super(spark);
this.table = table;
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what you think, would it be simpler to do the error check here? (instead of doExecute)

Preconditions.checkNotNull(snapshot, "Cannot analyze an empty table")

Then this.snapshotToAnalyze can just be a primitive long (no need to worry about nulls?) The argument for the setter is already "long" type.

this.table = table;
Snapshot snapshot = table.currentSnapshot();
if (snapshot != null) {
snapshotToAnalyze = snapshot.snapshotId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we typically use this when setting member variables, ie this.snapshotToAnalyze


private final Table table;
private Set<String> columns;
private Long snapshotToAnalyze;
Copy link
Collaborator

@szehon-ho szehon-ho Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: snapshotToAnalyze => snapshotId? (I feel 'toAnalyze' is apparent, also as its not actually Snapshot object). If its to not hide a variable, maybe we can change the other one as this one is used in more places and it would save more unnecessary chars.

if (snapshot != null) {
snapshotToAnalyze = snapshot.snapshotId();
}
columns =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, this.columns = ...

Schema schema = table.schema();
List<Types.NestedField> nestedFields =
columns.stream().map(schema::findField).collect(Collectors.toList());
final JavaPairRDD<String, ByteBuffer> colNameAndSketchPairRDD =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit; remove final

return (CompactSketch) sketch;
}

void update(final ByteBuffer value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove final?

return sketch.getEstimate();
}

private void writeObject(final ObjectOutputStream out) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remove final (and next method too)

.collect(Collectors.toList());
}

static Iterator<Tuple2<String, ThetaSketchJavaSerializable>> computeNDVSketches(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • any reason this method is not private?
  • how about this method return the Map directly to let the main method be cleaner?

private Result doExecute() {
LOG.info("Starting analysis of {} for snapshot {}", table.name(), snapshotId);
List<Blob> blobs =
supportedBlobTypes.stream()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks a bit silly , as we define the supportedTypes (as we decided not configurable in the beginning) and are just checking whether it is APACHE_DATASKETCHES_THETA_V1, should we just simplify it?

return columns.stream()
.map(
columnName -> {
Sketch sketch = sketchMap.get(columnName).getSketch();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually i just realize this, why do we need the previous method to pass in Map of column name? It seems we always use column id, we can simplify the collect spark job if so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to generate a map between ids and column names.
As for the previous method passing columns name,
we need it since we need to compose the dataframe to select columns based on columns names

Dataset<Row> data =
        spark
            .read()
            .option(SparkReadOptions.SNAPSHOT_ID, snapshotId)
            .table(tableName)
            .select(columns.stream().map(functions::col).toArray(Column[]::new));

Table table = Spark3Util.loadIcebergTable(spark, tableName);
SparkActions actions = SparkActions.get();
AnalyzeTable.Result results = actions.analyzeTable(table).columns("id", "data").execute();
actions.analyzeTable(table).columns("id", "data").execute();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

option: should we switch this to just one column of the table? (as the other one already calls with two columns, albeit implicitly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second invocaton is not needed, added by mistake.
Removed now


Assertions.assertEquals(1, table.statisticsFiles().size());
Assertions.assertEquals(2, table.statisticsFiles().get(0).blobMetadata().size());
assertNotEquals(0, table.statisticsFiles().get(0).fileSizeInBytes());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: doesnt hurt to also assert the ndv is collected here?

Snapshot snapshot = table.currentSnapshot();
if (snapshot == null) {
LOG.error("Unable to analyze the table since the table has no snapshots");
throw new RuntimeException("Snapshot id is null");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: update exception message to more reflect error? (like the log message?)


static ThetaSketchJavaSerializable combineSketch(
ThetaSketchJavaSerializable sketch1, ThetaSketchJavaSerializable sketch2) {
ThetaSketchJavaSerializable emptySketchWrapped =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we just inline this in the return value? It doesnt seem to be used elsewhere.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry! final few comments on javadoc and small consistency nit

/** An action that collects statistics of an Iceberg table and writes to Puffin files. */
public interface AnalyzeTable extends Action<AnalyzeTable, AnalyzeTable.Result> {
/**
* The set of columns to be analyzed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose the set of columns to be analyzed, by default all columns are analyzed.

AnalyzeTable columns(String... columns);

/**
* Id of the snapshot for which stats need to be collected
Copy link
Collaborator

@szehon-ho szehon-ho Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose the table snapshot to analyze, by default the current snapshot is analyzed.

/**
* Id of the snapshot for which stats need to be collected
*
* @param snapshotId long id of the snapshot for which stats need to be collected
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'to be collected' => 'analyzed' to be consistent with previous javadoc?

*/
AnalyzeTable snapshot(long snapshotId);

/** The action result that contains summaries of the Analysis. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Analysis can be lower case, as its not a class object.

}

private static Map<Integer, ThetaSketchJavaSerializable> computeNDVSketches(
SparkSession spark, Table table, long snapshotId, Set<String> toBeAnalyzedColumns) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: columnsToBeAnalyzed to be consistent with above method

RuntimeException exception =
assertThrows(
RuntimeException.class, () -> actions.analyzeTable(table).columns("id").execute());
assertTrue(exception.getMessage().contains("Snapshot id is null"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to change this message?

@aokolnychyi
Copy link
Contributor

I'll have some time to take a look this week.

@@ -70,4 +70,10 @@ default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewritePositionDeletes");
}

/** Instantiates an action to analyze tables */
default AnalyzeTable analyzeTable(Table table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Have we considered other names like computeTableStats or refreshTableStats to be a bit more specific? What naming is used in other engines? I'd be curious to hear from everyone.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I have a same concern about this generic name

#10288 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand there is ANALYZE command in Spark but I wonder how to handle partition stats in the future (e.g. like whether they should be computed as part of analyze or separately).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trino and Spark seems to be collecting stats specific partition level stats as part of the Analyze command grammar. (Or atleast they dont have a separate command to collect partition stats)
So went with the name, but open to changing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe by default collect all the stats and have option to compute specific stats like distinct count and partition stats by specifying individually.

@@ -70,4 +70,10 @@ default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewritePositionDeletes");
}

/** Instantiates an action to analyze tables */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Missing . at the end?

interface Result {

/** Returns statistics file. */
StatisticsFile statisticFile();
Copy link
Contributor

@aokolnychyi aokolnychyi Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing s here: statisticFile() -> statisticsFile().

AnalyzeTable columns(String... columns);

/**
* Id of the snapshot for which stats need to be collected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: id -> ID everywhere.

@@ -59,6 +59,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-arrow')
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}")
implementation("org.apache.datasketches:datasketches-java:${libs.versions.datasketches.get()}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark by any chance ship this? Do we have to worry about conflicts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. Looks like sql/catalyst uses the same library.
Should we shade this here and pin the version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have shaded the library

super(spark);
this.table = table;
Snapshot snapshot = table.currentSnapshot();
if (snapshot == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, shouldn't we simply gracefully return in this case? Why throw an exception?

Copy link
Contributor Author

@karuppayya karuppayya Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense,
This action current returns the statistic file as output.
In this case of a table with no snapshots, should we return null for results?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants