Skip to content

Commit

Permalink
AWS: Retain Glue Catalog column comment (#10276)
Browse files Browse the repository at this point in the history
* Retain Glue Catalog column comment

* merge handling existing column comment to creating column set

* apply spotless

* Add newline after else block for improved readability
  • Loading branch information
lawofcycles committed Jul 5, 2024
1 parent fc213cc commit 9a0fc49
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
Expand All @@ -39,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.Table;
Expand Down Expand Up @@ -158,4 +161,37 @@ public static void updateTableDescription(
.build();
glue.updateTable(request);
}

public static void updateTableColumns(
String namespace, String tableName, Function<Column, Column> columnUpdater) {
GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
Table existingTable = response.table();
List<Column> updatedColumns =
existingTable.storageDescriptor().columns().stream()
.map(columnUpdater)
.collect(Collectors.toList());

UpdateTableRequest request =
UpdateTableRequest.builder()
.catalogId(existingTable.catalogId())
.databaseName(existingTable.databaseName())
.tableInput(
TableInput.builder()
.description(existingTable.description())
.name(existingTable.name())
.partitionKeys(existingTable.partitionKeys())
.tableType(existingTable.tableType())
.owner(existingTable.owner())
.parameters(existingTable.parameters())
.storageDescriptor(
existingTable
.storageDescriptor()
.toBuilder()
.columns(updatedColumns)
.build())
.build())
.build();
glue.updateTable(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,69 @@ public void testUpdateTable() {
assertThat(response.table().description()).isEqualTo(updatedComment);
}

@Test
public void testDropColumn() {
String namespace = createNamespace();
String tableName = createTable(namespace);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
table
.updateSchema()
.addColumn("c2", Types.StringType.get(), "updated from Iceberg API")
.addColumn("c3", Types.StringType.get())
.commit();

updateTableColumns(
namespace,
tableName,
column -> {
if (column.name().equals("c3")) {
return column.toBuilder().comment("updated from Glue API").build();
} else {
return column;
}
});

table.updateSchema().deleteColumn("c2").deleteColumn("c3").commit();

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
List<Column> actualColumns = response.table().storageDescriptor().columns();

List<Column> expectedColumns =
ImmutableList.of(
Column.builder()
.name("c1")
.type("string")
.comment("c1")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c2")
.type("string")
.comment("updated from Iceberg API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
.build(),
Column.builder()
.name("c3")
.type("string")
.comment("updated from Glue API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
.build());
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testRenameTable() {
String namespace = createNamespace();
Expand Down Expand Up @@ -514,6 +577,72 @@ public void testColumnCommentsAndParameters() {
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testGlueTableColumnCommentsPreserved() {
String namespace = createNamespace();
String tableName = createTable(namespace);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
table
.updateSchema()
.addColumn("c2", Types.StringType.get())
.addColumn("c3", Types.StringType.get())
.commit();

updateTableColumns(
namespace,
tableName,
column -> {
if (column.name().equals("c2") || column.name().equals("c3")) {
return column.toBuilder().comment("updated from Glue API").build();
} else {
return column;
}
});

table
.updateSchema()
.updateColumn("c2", Types.StringType.get(), "updated from Iceberg API")
.commit();

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
List<Column> actualColumns = response.table().storageDescriptor().columns();

List<Column> expectedColumns =
ImmutableList.of(
Column.builder()
.name("c1")
.type("string")
.comment("c1")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c2")
.type("string")
.comment("updated from Iceberg API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c3")
.type("string")
.comment("updated from Glue API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build());
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testTablePropsDefinedAtCatalogLevel() {
String namespace = createNamespace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,10 @@ void persistGlueTable(
.skipArchive(awsProperties.glueCatalogSkipArchive())
.tableInput(
TableInput.builder()
// Call description before applyMutation so that applyMutation overwrites the
// description with the comment specified in the query
.description(glueTable.description())
.applyMutation(
builder ->
IcebergToGlueConverter.setTableInputInformation(builder, metadata))
IcebergToGlueConverter.setTableInputInformation(
builder, metadata, glueTable))
.name(tableName)
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.aws.glue;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -49,6 +50,7 @@
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;

class IcebergToGlueConverter {
Expand Down Expand Up @@ -219,6 +221,29 @@ static String getTableName(TableIdentifier tableIdentifier, boolean skipNameVali
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata) {
setTableInputInformation(tableInputBuilder, metadata, null);
}

/**
* Set Glue table input information based on Iceberg table metadata, optionally preserving
* comments from an existing Glue table's columns.
*
* <p>A best-effort conversion of Iceberg metadata to Glue table is performed to display Iceberg
* information in Glue, but such information is only intended for informational human read access
* through tools like UI or CLI, and should never be used by any query processing engine to infer
* information like schema, partition spec, etc. The source of truth is stored in the actual
* Iceberg metadata file defined by the metadata_location table property.
*
* <p>If an existing Glue table is provided, the comments from its columns will be preserved in
* the resulting Glue TableInput. This is useful when updating an existing Glue table to retain
* any user-defined comments on the columns.
*
* @param tableInputBuilder Glue TableInput builder
* @param metadata Iceberg table metadata
* @param existingTable optional existing Glue table, used to preserve column comments
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata, Table existingTable) {
try {
Map<String, String> properties = metadata.properties();
StorageDescriptor.Builder storageDescriptor = StorageDescriptor.builder();
Expand All @@ -231,11 +256,28 @@ static void setTableInputInformation(
.collect(Collectors.toSet()));
}

Optional.ofNullable(properties.get(GLUE_DESCRIPTION_KEY))
.ifPresent(tableInputBuilder::description);
String description = properties.get(GLUE_DESCRIPTION_KEY);
if (description != null) {
tableInputBuilder.description(description);
} else if (existingTable != null) {
Optional.ofNullable(existingTable.description()).ifPresent(tableInputBuilder::description);
}

Map<String, String> existingColumnMap = null;
if (existingTable != null) {
List<Column> existingColumns = existingTable.storageDescriptor().columns();
existingColumnMap =
existingColumns.stream()
.filter(column -> column.comment() != null)
.collect(Collectors.toMap(Column::name, Column::comment));
} else {
existingColumnMap = Collections.emptyMap();
}

List<Column> columns = toColumns(metadata, existingColumnMap);

tableInputBuilder.storageDescriptor(
storageDescriptor.location(metadata.location()).columns(toColumns(metadata)).build());
storageDescriptor.location(metadata.location()).columns(columns).build());
} catch (RuntimeException e) {
LOG.warn(
"Encountered unexpected exception while converting Iceberg metadata to Glue table information",
Expand Down Expand Up @@ -297,18 +339,20 @@ private static String toTypeString(Type type) {
}
}

private static List<Column> toColumns(TableMetadata metadata) {
private static List<Column> toColumns(
TableMetadata metadata, Map<String, String> existingColumnMap) {
List<Column> columns = Lists.newArrayList();
Set<String> addedNames = Sets.newHashSet();

for (NestedField field : metadata.schema().columns()) {
addColumnWithDedupe(columns, addedNames, field, true /* is current */);
addColumnWithDedupe(columns, addedNames, field, true /* is current */, existingColumnMap);
}

for (Schema schema : metadata.schemas()) {
if (schema.schemaId() != metadata.currentSchemaId()) {
for (NestedField field : schema.columns()) {
addColumnWithDedupe(columns, addedNames, field, false /* is not current */);
addColumnWithDedupe(
columns, addedNames, field, false /* is not current */, existingColumnMap);
}
}
}
Expand All @@ -317,19 +361,29 @@ private static List<Column> toColumns(TableMetadata metadata) {
}

private static void addColumnWithDedupe(
List<Column> columns, Set<String> dedupe, NestedField field, boolean isCurrent) {
List<Column> columns,
Set<String> dedupe,
NestedField field,
boolean isCurrent,
Map<String, String> existingColumnMap) {
if (!dedupe.contains(field.name())) {
columns.add(
Column.Builder builder =
Column.builder()
.name(field.name())
.type(toTypeString(field.type()))
.comment(field.doc())
.parameters(
ImmutableMap.of(
ICEBERG_FIELD_ID, Integer.toString(field.fieldId()),
ICEBERG_FIELD_OPTIONAL, Boolean.toString(field.isOptional()),
ICEBERG_FIELD_CURRENT, Boolean.toString(isCurrent)))
.build());
ICEBERG_FIELD_CURRENT, Boolean.toString(isCurrent)));

if (field.doc() != null && !field.doc().isEmpty()) {
builder.comment(field.doc());
} else if (existingColumnMap != null && existingColumnMap.containsKey(field.name())) {
builder.comment(existingColumnMap.get(field.name()));
}

columns.add(builder.build());
dedupe.add(field.name());
}
}
Expand Down
Loading

0 comments on commit 9a0fc49

Please sign in to comment.