Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS: Retain Glue Catalog column comment after updating Iceberg table #10276

Merged
merged 5 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
Expand All @@ -39,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.Table;
Expand Down Expand Up @@ -158,4 +161,37 @@ public static void updateTableDescription(
.build();
glue.updateTable(request);
}

public static void updateTableColumns(
String namespace, String tableName, Function<Column, Column> columnUpdater) {
GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
Table existingTable = response.table();
List<Column> updatedColumns =
existingTable.storageDescriptor().columns().stream()
.map(columnUpdater)
.collect(Collectors.toList());

UpdateTableRequest request =
UpdateTableRequest.builder()
.catalogId(existingTable.catalogId())
.databaseName(existingTable.databaseName())
.tableInput(
TableInput.builder()
.description(existingTable.description())
.name(existingTable.name())
.partitionKeys(existingTable.partitionKeys())
.tableType(existingTable.tableType())
.owner(existingTable.owner())
.parameters(existingTable.parameters())
.storageDescriptor(
existingTable
.storageDescriptor()
.toBuilder()
.columns(updatedColumns)
.build())
.build())
.build();
glue.updateTable(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,69 @@ public void testUpdateTable() {
assertThat(response.table().description()).isEqualTo(updatedComment);
}

@Test
public void testDropColumn() {
String namespace = createNamespace();
String tableName = createTable(namespace);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
table
.updateSchema()
.addColumn("c2", Types.StringType.get(), "updated from Iceberg API")
.addColumn("c3", Types.StringType.get())
.commit();

updateTableColumns(
namespace,
tableName,
column -> {
if (column.name().equals("c3")) {
return column.toBuilder().comment("updated from Glue API").build();
} else {
return column;
}
});

table.updateSchema().deleteColumn("c2").deleteColumn("c3").commit();
Fokko marked this conversation as resolved.
Show resolved Hide resolved

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
List<Column> actualColumns = response.table().storageDescriptor().columns();

List<Column> expectedColumns =
ImmutableList.of(
Column.builder()
.name("c1")
.type("string")
.comment("c1")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c2")
.type("string")
.comment("updated from Iceberg API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
.build(),
Column.builder()
.name("c3")
.type("string")
.comment("updated from Glue API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
.build());
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testRenameTable() {
String namespace = createNamespace();
Expand Down Expand Up @@ -514,6 +577,72 @@ public void testColumnCommentsAndParameters() {
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testGlueTableColumnCommentsPreserved() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the case where we drop a column from a table. I believe today we keep those columns and descriptions. Should we add a test for that?

String namespace = createNamespace();
String tableName = createTable(namespace);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
table
.updateSchema()
.addColumn("c2", Types.StringType.get())
.addColumn("c3", Types.StringType.get())
.commit();

updateTableColumns(
namespace,
tableName,
column -> {
if (column.name().equals("c2") || column.name().equals("c3")) {
return column.toBuilder().comment("updated from Glue API").build();
} else {
return column;
}
});

table
.updateSchema()
.updateColumn("c2", Types.StringType.get(), "updated from Iceberg API")
.commit();

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
List<Column> actualColumns = response.table().storageDescriptor().columns();

List<Column> expectedColumns =
ImmutableList.of(
Column.builder()
.name("c1")
.type("string")
.comment("c1")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c2")
.type("string")
.comment("updated from Iceberg API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c3")
.type("string")
.comment("updated from Glue API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build());
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testTablePropsDefinedAtCatalogLevel() {
String namespace = createNamespace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,10 @@ void persistGlueTable(
.skipArchive(awsProperties.glueCatalogSkipArchive())
.tableInput(
TableInput.builder()
// Call description before applyMutation so that applyMutation overwrites the
// description with the comment specified in the query
.description(glueTable.description())
.applyMutation(
builder ->
IcebergToGlueConverter.setTableInputInformation(builder, metadata))
IcebergToGlueConverter.setTableInputInformation(
builder, metadata, glueTable))
.name(tableName)
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.aws.glue;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -49,6 +50,7 @@
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;

class IcebergToGlueConverter {
Expand Down Expand Up @@ -219,6 +221,29 @@ static String getTableName(TableIdentifier tableIdentifier, boolean skipNameVali
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata) {
setTableInputInformation(tableInputBuilder, metadata, null);
}

/**
* Set Glue table input information based on Iceberg table metadata, optionally preserving
* comments from an existing Glue table's columns.
*
* <p>A best-effort conversion of Iceberg metadata to Glue table is performed to display Iceberg
* information in Glue, but such information is only intended for informational human read access
* through tools like UI or CLI, and should never be used by any query processing engine to infer
* information like schema, partition spec, etc. The source of truth is stored in the actual
* Iceberg metadata file defined by the metadata_location table property.
*
* <p>If an existing Glue table is provided, the comments from its columns will be preserved in
* the resulting Glue TableInput. This is useful when updating an existing Glue table to retain
* any user-defined comments on the columns.
*
* @param tableInputBuilder Glue TableInput builder
* @param metadata Iceberg table metadata
* @param existingTable optional existing Glue table, used to preserve column comments
*/
amogh-jahagirdar marked this conversation as resolved.
Show resolved Hide resolved
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata, Table existingTable) {
try {
Map<String, String> properties = metadata.properties();
StorageDescriptor.Builder storageDescriptor = StorageDescriptor.builder();
Expand All @@ -231,11 +256,27 @@ static void setTableInputInformation(
.collect(Collectors.toSet()));
}

Optional.ofNullable(properties.get(GLUE_DESCRIPTION_KEY))
.ifPresent(tableInputBuilder::description);
String description = properties.get(GLUE_DESCRIPTION_KEY);
if (description != null) {
tableInputBuilder.description(description);
} else if (existingTable != null) {
Optional.ofNullable(existingTable.description()).ifPresent(tableInputBuilder::description);
}

Map<String, String> existingColumnMap = null;
if (existingTable != null) {
List<Column> existingColumns = existingTable.storageDescriptor().columns();
existingColumnMap =
existingColumns.stream()
.filter(column -> column.comment() != null)
.collect(Collectors.toMap(Column::name, Column::comment));
} else {
existingColumnMap = Collections.emptyMap();
}
List<Column> columns = toColumns(metadata, existingColumnMap);
amogh-jahagirdar marked this conversation as resolved.
Show resolved Hide resolved

tableInputBuilder.storageDescriptor(
storageDescriptor.location(metadata.location()).columns(toColumns(metadata)).build());
storageDescriptor.location(metadata.location()).columns(columns).build());
} catch (RuntimeException e) {
LOG.warn(
"Encountered unexpected exception while converting Iceberg metadata to Glue table information",
Expand Down Expand Up @@ -297,18 +338,20 @@ private static String toTypeString(Type type) {
}
}

private static List<Column> toColumns(TableMetadata metadata) {
private static List<Column> toColumns(
TableMetadata metadata, Map<String, String> existingColumnMap) {
List<Column> columns = Lists.newArrayList();
Set<String> addedNames = Sets.newHashSet();

for (NestedField field : metadata.schema().columns()) {
addColumnWithDedupe(columns, addedNames, field, true /* is current */);
addColumnWithDedupe(columns, addedNames, field, true /* is current */, existingColumnMap);
}

for (Schema schema : metadata.schemas()) {
if (schema.schemaId() != metadata.currentSchemaId()) {
for (NestedField field : schema.columns()) {
addColumnWithDedupe(columns, addedNames, field, false /* is not current */);
addColumnWithDedupe(
columns, addedNames, field, false /* is not current */, existingColumnMap);
}
}
}
Expand All @@ -317,19 +360,29 @@ private static List<Column> toColumns(TableMetadata metadata) {
}

private static void addColumnWithDedupe(
List<Column> columns, Set<String> dedupe, NestedField field, boolean isCurrent) {
List<Column> columns,
Set<String> dedupe,
NestedField field,
boolean isCurrent,
Map<String, String> existingColumnMap) {
if (!dedupe.contains(field.name())) {
columns.add(
Column.Builder builder =
Column.builder()
.name(field.name())
.type(toTypeString(field.type()))
.comment(field.doc())
.parameters(
ImmutableMap.of(
ICEBERG_FIELD_ID, Integer.toString(field.fieldId()),
ICEBERG_FIELD_OPTIONAL, Boolean.toString(field.isOptional()),
ICEBERG_FIELD_CURRENT, Boolean.toString(isCurrent)))
.build());
ICEBERG_FIELD_CURRENT, Boolean.toString(isCurrent)));

if (field.doc() != null && !field.doc().isEmpty()) {
builder.comment(field.doc());
} else if (existingColumnMap != null && existingColumnMap.containsKey(field.name())) {
builder.comment(existingColumnMap.get(field.name()));
}

columns.add(builder.build());
dedupe.add(field.name());
}
}
Expand Down
Loading