Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
[Managed Iceberg] Support BigQuery Metastore catalog (apache#32242)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Aug 21, 2024
1 parent b3a874f commit 3bf2421
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 5
"modification": 6
}
2 changes: 2 additions & 0 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ dependencies {
// Needed for HiveCatalog
runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2")
runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow")
// Needed for BigQuery Metastore catalog (this isn't supported for java 8)
runtimeOnly project(path: ":sdks:java:io:iceberg:bigquerymetastore", configuration: "shadow")

runtimeOnly library.java.kafka_clients
runtimeOnly library.java.slf4j_jdk14
Expand Down
Binary file not shown.
37 changes: 37 additions & 0 deletions sdks/java/io/iceberg/bigquerymetastore/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins {
id 'org.apache.beam.module'
}

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.iceberg.bqms',
shadowClosure: {
dependencies {
include(dependency(files("bqms-catalog/iceberg-bigquery-catalog-1.5.2-0.1.0.jar")))
}
relocate 'com.google.guava', getJavaRelocatedPath('iceberg.bqms.com.google.guava')
},
)

description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg :: BigQuery Metastore"
ext.summary = "A copy of the BQMS catalog with some popular libraries relocated."

dependencies {
implementation files("bqms-catalog/iceberg-bigquery-catalog-1.5.2-0.1.0.jar")
}
6 changes: 6 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common

testImplementation project(path: ":sdks:java:io:iceberg:bigquerymetastore", configuration: "shadow")
testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation library.java.bigdataoss_gcs_connector
Expand Down Expand Up @@ -109,6 +110,11 @@ task integrationTest(type: Test) {
outputs.upToDateWhen { false }

include '**/*IT.class'
// BQ metastore catalog doesn't support java 8
if (project.findProperty('testJavaVersion') == '8' ||
JavaVersion.current().equals(JavaVersion.VERSION_1_8)) {
exclude '**/BigQueryMetastoreCatalogIT.class'
}

maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/**
* Integration tests for reading and writing Iceberg tables using the BigQuery Metastore Catalog.
*/
public class BigQueryMetastoreCatalogIT {
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
Schema.builder()
.addStringField("doubly_nested_str")
.addInt64Field("doubly_nested_float")
.build();

private static final Schema NESTED_ROW_SCHEMA =
Schema.builder()
.addStringField("nested_str")
.addInt32Field("nested_int")
.addFloatField("nested_float")
.addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
.build();
private static final Schema BEAM_SCHEMA =
Schema.builder()
.addStringField("str")
.addBooleanField("bool")
.addNullableInt32Field("nullable_int")
.addNullableInt64Field("nullable_long")
.addArrayField("arr_long", Schema.FieldType.INT64)
.addRowField("row", NESTED_ROW_SCHEMA)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.build();

private static final SimpleFunction<Long, Row> ROW_FUNC =
new SimpleFunction<Long, Row>() {
@Override
public Row apply(Long num) {
String strNum = Long.toString(num);
Row nestedRow =
Row.withSchema(NESTED_ROW_SCHEMA)
.addValue("nested_str_value_" + strNum)
.addValue(Integer.valueOf(strNum))
.addValue(Float.valueOf(strNum + "." + strNum))
.addValue(
Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
.addValue("doubly_nested_str_value_" + strNum)
.addValue(num)
.build())
.build();

return Row.withSchema(BEAM_SCHEMA)
.addValue("str_value_" + strNum)
.addValue(num % 2 == 0)
.addValue(Integer.valueOf(strNum))
.addValue(num)
.addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList()))
.addValue(nestedRow)
.addValue(num % 2 == 0 ? null : nestedRow)
.build();
}
};

private static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
private static final SimpleFunction<Row, Record> RECORD_FUNC =
new SimpleFunction<Row, Record>() {
@Override
public Record apply(Row input) {
return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input);
}
};

@Rule public TestPipeline writePipeline = TestPipeline.create();

@Rule public TestPipeline readPipeline = TestPipeline.create();

private static final String TEST_CATALOG = "beam_test_" + System.nanoTime();
private static final String DATASET = "iceberg_bigquerymetastore_test_" + System.nanoTime();
@Rule public TestName testName = new TestName();
private static final String WAREHOUSE = TestPipeline.testingPipelineOptions().getTempLocation();
private static Catalog catalog;
private static Map<String, String> catalogProps;
private TableIdentifier tableIdentifier;

@BeforeClass
public static void setUp() {
GcpOptions options = TestPipeline.testingPipelineOptions().as(GcpOptions.class);
catalogProps =
ImmutableMap.<String, String>builder()
.put("gcp_project", options.getProject())
.put("gcp_location", "us-central1")
.put("catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog")
.put("warehouse", WAREHOUSE)
.build();
catalog =
CatalogUtil.loadCatalog(
catalogProps.get("catalog-impl"), TEST_CATALOG, catalogProps, new Configuration());
catalog.initialize(TEST_CATALOG, catalogProps);
((SupportsNamespaces) catalog).createNamespace(Namespace.of(DATASET));
}

@After
public void cleanup() {
// We need to cleanup tables first before deleting the dataset
catalog.dropTable(tableIdentifier);
}

@AfterClass
public static void tearDown() {
((SupportsNamespaces) catalog).dropNamespace(Namespace.of(DATASET));
}

private Map<String, Object> getManagedIcebergConfig(TableIdentifier table) {
return ImmutableMap.<String, Object>builder()
.put("table", table.toString())
.put("catalog_name", TEST_CATALOG)
.put("catalog_properties", catalogProps)
.build();
}

@Test
public void testReadWithBqmsCatalog() throws IOException {
tableIdentifier =
TableIdentifier.parse(String.format("%s.%s", DATASET, testName.getMethodName()));
Table table = catalog.createTable(tableIdentifier, ICEBERG_SCHEMA);

List<Row> expectedRows =
LongStream.range(1, 1000).boxed().map(ROW_FUNC::apply).collect(Collectors.toList());
List<Record> records =
expectedRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());

// write iceberg records with bqms catalog
String filepath = table.location() + "/" + UUID.randomUUID();
DataWriter<Record> writer =
Parquet.writeData(table.io().newOutputFile(filepath))
.schema(ICEBERG_SCHEMA)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(table.spec())
.build();
for (Record rec : records) {
writer.write(rec);
}
writer.close();
AppendFiles appendFiles = table.newAppend();
String manifestFilename = FileFormat.AVRO.addExtension(filepath + ".manifest");
OutputFile outputFile = table.io().newOutputFile(manifestFilename);
ManifestWriter<DataFile> manifestWriter;
try (ManifestWriter<DataFile> openWriter = ManifestFiles.write(table.spec(), outputFile)) {
openWriter.add(writer.toDataFile());
manifestWriter = openWriter;
}
appendFiles.appendManifest(manifestWriter.toManifestFile());
appendFiles.commit();

// Run Managed Iceberg read
PCollection<Row> outputRows =
readPipeline
.apply(
Managed.read(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)))
.getSinglePCollection();
PAssert.that(outputRows).containsInAnyOrder(expectedRows);
readPipeline.run().waitUntilFinish();
}

@Test
public void testWriteWithBqmsCatalog() {
tableIdentifier =
TableIdentifier.parse(String.format("%s.%s", DATASET, testName.getMethodName()));
catalog.createTable(tableIdentifier, IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA));

List<Row> inputRows =
LongStream.range(1, 1000).mapToObj(ROW_FUNC::apply).collect(Collectors.toList());
List<Record> expectedRecords =
inputRows.stream().map(RECORD_FUNC::apply).collect(Collectors.toList());

// Run Managed Iceberg write
writePipeline
.apply(Create.of(inputRows))
.setRowSchema(BEAM_SCHEMA)
.apply(Managed.write(Managed.ICEBERG).withConfig(getManagedIcebergConfig(tableIdentifier)));
writePipeline.run().waitUntilFinish();

// read back the records and check everything's there
Table table = catalog.loadTable(tableIdentifier);
TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA);
List<Record> writtenRecords = new ArrayList<>();
for (CombinedScanTask task : tableScan.planTasks()) {
InputFilesDecryptor descryptor =
new InputFilesDecryptor(task, table.io(), table.encryption());
for (FileScanTask fileTask : task.files()) {
InputFile inputFile = descryptor.getInputFile(fileTask);
CloseableIterable<Record> iterable =
Parquet.read(inputFile)
.split(fileTask.start(), fileTask.length())
.project(ICEBERG_SCHEMA)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema))
.filter(fileTask.residual())
.build();

for (Record rec : iterable) {
writtenRecords.add(rec);
}
}
}
assertThat(expectedRecords, containsInAnyOrder(writtenRecords.toArray()));
}
}
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,5 @@ include("sdks:java:io:iceberg:hive")
findProject(":sdks:java:io:iceberg:hive")?.name = "hive"
include("sdks:java:io:iceberg:hive:exec")
findProject(":sdks:java:io:iceberg:hive:exec")?.name = "exec"
include("sdks:java:io:iceberg:bigquerymetastore")
findProject(":sdks:java:io:iceberg:bigquerymetastore")?.name = "bigquerymetastore"

0 comments on commit 3bf2421

Please sign in to comment.