Skip to content

Commit

Permalink
Retain Glue Catalog column comment
Browse files Browse the repository at this point in the history
  • Loading branch information
lawofcycles committed May 29, 2024
1 parent f9cdde2 commit d094968
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
Expand All @@ -39,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.Table;
Expand Down Expand Up @@ -158,4 +161,37 @@ public static void updateTableDescription(
.build();
glue.updateTable(request);
}

public static void updateTableColumns(
String namespace, String tableName, Function<Column, Column> columnUpdater) {
GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
Table existingTable = response.table();
List<Column> updatedColumns =
existingTable.storageDescriptor().columns().stream()
.map(columnUpdater)
.collect(Collectors.toList());

UpdateTableRequest request =
UpdateTableRequest.builder()
.catalogId(existingTable.catalogId())
.databaseName(existingTable.databaseName())
.tableInput(
TableInput.builder()
.description(existingTable.description())
.name(existingTable.name())
.partitionKeys(existingTable.partitionKeys())
.tableType(existingTable.tableType())
.owner(existingTable.owner())
.parameters(existingTable.parameters())
.storageDescriptor(
existingTable
.storageDescriptor()
.toBuilder()
.columns(updatedColumns)
.build())
.build())
.build();
glue.updateTable(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,69 @@ public void testUpdateTable() {
assertThat(response.table().description()).isEqualTo(updatedComment);
}

@Test
public void testDropColumn() {
String namespace = createNamespace();
String tableName = createTable(namespace);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
table
.updateSchema()
.addColumn("c2", Types.StringType.get(), "updated from Iceberg API")
.addColumn("c3", Types.StringType.get())
.commit();

updateTableColumns(
namespace,
tableName,
column -> {
if (column.name().equals("c3")) {
return column.toBuilder().comment("updated from Glue API").build();
} else {
return column;
}
});

table.updateSchema().deleteColumn("c2").deleteColumn("c3").commit();

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
List<Column> actualColumns = response.table().storageDescriptor().columns();

List<Column> expectedColumns =
ImmutableList.of(
Column.builder()
.name("c1")
.type("string")
.comment("c1")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c2")
.type("string")
.comment("updated from Iceberg API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
.build(),
Column.builder()
.name("c3")
.type("string")
.comment("updated from Glue API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "false"))
.build());
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testRenameTable() {
String namespace = createNamespace();
Expand Down Expand Up @@ -514,6 +577,72 @@ public void testColumnCommentsAndParameters() {
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testGlueTableColumnCommentsPreserved() {
String namespace = createNamespace();
String tableName = createTable(namespace);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
table
.updateSchema()
.addColumn("c2", Types.StringType.get())
.addColumn("c3", Types.StringType.get())
.commit();

updateTableColumns(
namespace,
tableName,
column -> {
if (column.name().equals("c2") || column.name().equals("c3")) {
return column.toBuilder().comment("updated from Glue API").build();
} else {
return column;
}
});

table
.updateSchema()
.updateColumn("c2", Types.StringType.get(), "updated from Iceberg API")
.commit();

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
List<Column> actualColumns = response.table().storageDescriptor().columns();

List<Column> expectedColumns =
ImmutableList.of(
Column.builder()
.name("c1")
.type("string")
.comment("c1")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c2")
.type("string")
.comment("updated from Iceberg API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c3")
.type("string")
.comment("updated from Glue API")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build());
assertThat(actualColumns).isEqualTo(expectedColumns);
}

@Test
public void testTablePropsDefinedAtCatalogLevel() {
String namespace = createNamespace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,10 @@ void persistGlueTable(
.skipArchive(awsProperties.glueCatalogSkipArchive())
.tableInput(
TableInput.builder()
// Call description before applyMutation so that applyMutation overwrites the
// description with the comment specified in the query
.description(glueTable.description())
.applyMutation(
builder ->
IcebergToGlueConverter.setTableInputInformation(builder, metadata))
IcebergToGlueConverter.setTableInputInformation(
builder, metadata, glueTable))
.name(tableName)
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;

class IcebergToGlueConverter {
Expand Down Expand Up @@ -219,6 +220,29 @@ static String getTableName(TableIdentifier tableIdentifier, boolean skipNameVali
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata) {
setTableInputInformation(tableInputBuilder, metadata, null);
}

/**
* Set Glue table input information based on Iceberg table metadata, optionally preserving
* comments from an existing Glue table's columns.
*
* <p>A best-effort conversion of Iceberg metadata to Glue table is performed to display Iceberg
* information in Glue, but such information is only intended for informational human read access
* through tools like UI or CLI, and should never be used by any query processing engine to infer
* information like schema, partition spec, etc. The source of truth is stored in the actual
* Iceberg metadata file defined by the metadata_location table property.
*
* <p>If an existing Glue table is provided, the comments from its columns will be preserved in
* the resulting Glue TableInput. This is useful when updating an existing Glue table to retain
* any user-defined comments on the columns.
*
* @param tableInputBuilder Glue TableInput builder
* @param metadata Iceberg table metadata
* @param existingTable optional existing Glue table, used to preserve column comments
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata, Table existingTable) {
try {
Map<String, String> properties = metadata.properties();
StorageDescriptor.Builder storageDescriptor = StorageDescriptor.builder();
Expand All @@ -231,11 +255,33 @@ static void setTableInputInformation(
.collect(Collectors.toSet()));
}

Optional.ofNullable(properties.get(GLUE_DESCRIPTION_KEY))
.ifPresent(tableInputBuilder::description);
String description = properties.get(GLUE_DESCRIPTION_KEY);
if (description != null) {
tableInputBuilder.description(description);
} else if (existingTable != null) {
Optional.ofNullable(existingTable.description()).ifPresent(tableInputBuilder::description);
}

List<Column> columns = toColumns(metadata);
if (existingTable != null) {
List<Column> existingColumns = existingTable.storageDescriptor().columns();
Map<String, String> existingColumnMap =
existingColumns.stream().collect(Collectors.toMap(Column::name, Column::comment));

columns =
columns.stream()
.map(
newColumn -> {
String existingComment = existingColumnMap.get(newColumn.name());
return existingComment != null && newColumn.comment() == null
? newColumn.toBuilder().comment(existingComment).build()
: newColumn;
})
.collect(Collectors.toList());
}

tableInputBuilder.storageDescriptor(
storageDescriptor.location(metadata.location()).columns(toColumns(metadata)).build());
storageDescriptor.location(metadata.location()).columns(columns).build());
} catch (RuntimeException e) {
LOG.warn(
"Encountered unexpected exception while converting Iceberg metadata to Glue table information",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;

public class TestIcebergToGlueConverter {
Expand Down Expand Up @@ -311,4 +312,81 @@ public void testSetTableDescription() {
.as("description should match")
.isEqualTo(tableDescription);
}

@Test
public void testSetTableInputInformationWithExistingTable() {
// Actual TableInput
TableInput.Builder actualTableInputBuilder = TableInput.builder();
Schema schema =
new Schema(
Types.NestedField.required(1, "x", Types.StringType.get()),
Types.NestedField.required(2, "y", Types.StringType.get(), "new comment"),
Types.NestedField.required(3, "z", Types.StringType.get(), "new comment"));
PartitionSpec partitionSpec =
PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build();
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(schema, partitionSpec, "s3://test", tableLocationProperties);

// Existing Table
Table existingGlueTable =
Table.builder()
.storageDescriptor(
StorageDescriptor.builder()
.columns(
ImmutableList.of(
Column.builder().name("x").comment("existing comment").build(),
Column.builder().name("y").comment("existing comment").build()))
.build())
.build();

IcebergToGlueConverter.setTableInputInformation(
actualTableInputBuilder, tableMetadata, existingGlueTable);
TableInput actualTableInput = actualTableInputBuilder.build();

// Expected TableInput
TableInput expectedTableInput =
TableInput.builder()
.storageDescriptor(
StorageDescriptor.builder()
.location("s3://test")
.additionalLocations(Sets.newHashSet(tableLocationProperties.values()))
.columns(
ImmutableList.of(
Column.builder()
.name("x")
.type("string")
.comment("existing comment")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("y")
.type("string")
.comment("new comment")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("z")
.type("string")
.comment("new comment")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "3",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build()))
.build())
.build();

Assertions.assertThat(actualTableInput.storageDescriptor().columns())
.as("Columns should match")
.isEqualTo(expectedTableInput.storageDescriptor().columns());
}
}

0 comments on commit d094968

Please sign in to comment.