Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport HadoopCatalog related classes in Flink #10620

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Backport HadoopCatalog related classes in Flink
  • Loading branch information
tomtongue committed Jul 2, 2024
commit 68087da10f8b583c792a0927dbd87cf6fa37ddbb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.extension.ExtensionContext;

public class HadoopTableExtension extends HadoopCatalogExtension {
private final Schema schema;
private final PartitionSpec partitionSpec;

private Table table;

public HadoopTableExtension(String database, String tableName, Schema schema) {
this(database, tableName, schema, null);
}

public HadoopTableExtension(
String database, String tableName, Schema schema, PartitionSpec partitionSpec) {
super(database, tableName);
this.schema = schema;
this.partitionSpec = partitionSpec;
}

@Override
public void beforeEach(ExtensionContext context) throws Exception {
super.beforeEach(context);
if (partitionSpec == null) {
this.table = catalog.createTable(TableIdentifier.of(database, tableName), schema);
} else {
this.table =
catalog.createTable(TableIdentifier.of(database, tableName), schema, partitionSpec);
}
tableLoader.open();
}

public Table table() {
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
Expand All @@ -29,77 +31,74 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();
@ExtendWith(ParameterizedTestExtension.class)
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@RegisterExtension
public static MiniClusterExtension miniClusterResource =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();

@Rule
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
@RegisterExtension
private static final HadoopCatalogExtension catalogResource =
new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);

private TableLoader tableLoader;

private final FileFormat format;
private final int parallelism;
private final boolean partitioned;
@Parameter(index = 0)
private FileFormat format;

@Parameter(index = 1)
private int parallelism;

@Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
@Parameter(index = 2)
private boolean partitioned;

@Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
public static Object[][] parameters() {
return new Object[][] {
{"avro", 1, true},
{"avro", 1, false},
{"avro", 2, true},
{"avro", 2, false},
{"orc", 1, true},
{"orc", 1, false},
{"orc", 2, true},
{"orc", 2, false},
{"parquet", 1, true},
{"parquet", 1, false},
{"parquet", 2, true},
{"parquet", 2, false}
{FileFormat.AVRO, 1, true},
{FileFormat.AVRO, 1, false},
{FileFormat.AVRO, 2, true},
{FileFormat.AVRO, 2, false},
{FileFormat.ORC, 1, true},
{FileFormat.ORC, 1, false},
{FileFormat.ORC, 2, true},
{FileFormat.ORC, 2, false},
{FileFormat.PARQUET, 1, true},
{FileFormat.PARQUET, 1, false},
{FileFormat.PARQUET, 2, true},
{FileFormat.PARQUET, 2, false}
};
}

public TestFlinkIcebergSink(String format, int parallelism, boolean partitioned) {
this.format = FileFormat.fromString(format);
this.parallelism = parallelism;
this.partitioned = partitioned;
}

@Before
@BeforeEach
public void before() throws IOException {
table =
catalogResource
Expand All @@ -122,7 +121,7 @@ public void before() throws IOException {
tableLoader = catalogResource.tableLoader();
}

@Test
@TestTemplate
public void testWriteRowData() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
Expand Down Expand Up @@ -165,17 +164,17 @@ private int partitionFiles(String partition) throws IOException {
return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", partition)).size();
}

@Test
@TestTemplate
public void testWriteRow() throws Exception {
testWriteRow(null, DistributionMode.NONE);
}

@Test
@TestTemplate
public void testWriteRowWithTableSchema() throws Exception {
testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
}

@Test
@TestTemplate
public void testJobNoneDistributeMode() throws Exception {
table
.updateProperties()
Expand All @@ -187,12 +186,12 @@ public void testJobNoneDistributeMode() throws Exception {
if (parallelism > 1) {
if (partitioned) {
int files = partitionFiles("aaa") + partitionFiles("bbb") + partitionFiles("ccc");
Assert.assertTrue("Should have more than 3 files in iceberg table.", files > 3);
assertThat(files).isGreaterThan(3);
}
}
}

@Test
@TestTemplate
public void testJobHashDistributionMode() {
table
.updateProperties()
Expand All @@ -204,7 +203,7 @@ public void testJobHashDistributionMode() {
.hasMessage("Flink does not support 'range' write distribution mode now.");
}

@Test
@TestTemplate
public void testJobNullDistributionMode() throws Exception {
table
.updateProperties()
Expand All @@ -214,42 +213,33 @@ public void testJobNullDistributionMode() throws Exception {
testWriteRow(null, null);

if (partitioned) {
Assert.assertEquals(
"There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa"));
Assert.assertEquals(
"There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb"));
Assert.assertEquals(
"There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
assertThat(partitionFiles("aaa")).isEqualTo(1);
assertThat(partitionFiles("bbb")).isEqualTo(1);
assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}

@Test
@TestTemplate
public void testPartitionWriteMode() throws Exception {
testWriteRow(null, DistributionMode.HASH);
if (partitioned) {
Assert.assertEquals(
"There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa"));
Assert.assertEquals(
"There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb"));
Assert.assertEquals(
"There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
assertThat(partitionFiles("aaa")).isEqualTo(1);
assertThat(partitionFiles("bbb")).isEqualTo(1);
assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}

@Test
@TestTemplate
public void testShuffleByPartitionWithSchema() throws Exception {
testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
if (partitioned) {
Assert.assertEquals(
"There should be only 1 data file in partition 'aaa'", 1, partitionFiles("aaa"));
Assert.assertEquals(
"There should be only 1 data file in partition 'bbb'", 1, partitionFiles("bbb"));
Assert.assertEquals(
"There should be only 1 data file in partition 'ccc'", 1, partitionFiles("ccc"));
assertThat(partitionFiles("aaa")).isEqualTo(1);
assertThat(partitionFiles("bbb")).isEqualTo(1);
assertThat(partitionFiles("ccc")).isEqualTo(1);
}
}

@Test
@TestTemplate
public void testTwoSinksInDisjointedDAG() throws Exception {
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());

Expand Down Expand Up @@ -323,16 +313,14 @@ public void testTwoSinksInDisjointedDAG() throws Exception {
SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows));

leftTable.refresh();
Assert.assertNull(leftTable.currentSnapshot().summary().get("flink.test"));
Assert.assertNull(leftTable.currentSnapshot().summary().get("direction"));
assertThat(leftTable.currentSnapshot().summary()).doesNotContainKeys("flink.test", "direction");
rightTable.refresh();
Assert.assertEquals(
TestFlinkIcebergSink.class.getName(),
rightTable.currentSnapshot().summary().get("flink.test"));
Assert.assertEquals("rightTable", rightTable.currentSnapshot().summary().get("direction"));
assertThat(rightTable.currentSnapshot().summary())
.containsEntry("flink.test", TestFlinkIcebergSink.class.getName())
.containsEntry("direction", "rightTable");
}

@Test
@TestTemplate
public void testOverrideWriteConfigWithUnknownDistributionMode() {
Map<String, String> newProps = Maps.newHashMap();
newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
Expand All @@ -352,7 +340,7 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() {
.hasMessage("Invalid distribution mode: UNRECOGNIZED");
}

@Test
@TestTemplate
public void testOverrideWriteConfigWithUnknownFileFormat() {
Map<String, String> newProps = Maps.newHashMap();
newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
Expand All @@ -372,7 +360,7 @@ public void testOverrideWriteConfigWithUnknownFileFormat() {
.hasMessage("Invalid file format: UNRECOGNIZED");
}

@Test
@TestTemplate
public void testWriteRowWithTableRefreshInterval() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
Expand Down
Loading