Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink 1.19: Run without Hadoop #7369

Closed
wants to merge 7 commits into from

Conversation

Fokko
Copy link
Contributor

@Fokko Fokko commented Apr 18, 2023

Allow Flink to run without Hadoop

This PR aims to remove Hadoop's Configuration class from the main code path, so we can also run Flink without having the Hadoop JARs on the Java Classpath.

Python 3.9.19 (main, Mar 19 2024, 16:08:27) 
[Clang 15.0.0 (clang-1500.3.9.4)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> 
>>> env.add_jars("file:///Users/fokkodriesprong/Desktop/iceberg/flink/v1.19/flink-runtime/build/libs/iceberg-flink-runtime-1.19-1.6.0-SNAPSHOT.jar")
>>> env.add_jars("file:///Users/fokkodriesprong/Desktop/iceberg/aws-bundle/build/libs/iceberg-aws-bundle-1.6.0-SNAPSHOT.jar")
>>> 
>>> from pyflink.table import StreamTableEnvironment
>>> 
>>> table_env = StreamTableEnvironment.create(env)
>>> 
>>> table_env.execute_sql("""
...   CREATE CATALOG tabular WITH (
...       'type'='iceberg',
...       'catalog-type'='rest',
...       'uri'='https://api.tabular.io/ws/',
...       'credential'='abc',
...       'warehouse'='Fokko',
...       'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
...       'auth.default-refresh-enabled'='true'
...  )
... """).print()
OK
>>> 
>>> 
>>> table_env.execute_sql("USE CATALOG tabular").print()
OK
>>> table_env.execute_sql("SELECT * FROM examples.nyc_taxi_locations").print()
+----+-------------+--------------------------------+--------------------------------+
| op | location_id |                        borough |                      zone_name |
+----+-------------+--------------------------------+--------------------------------+
| +I |           1 |                            EWR |                 Newark Airport |
| +I |           2 |                         Queens |                    Jamaica Bay |
| +I |           3 |                          Bronx |        Allerton/Pelham Gardens |
...
| +I |         265 |                        Unknown |                             NA |
+----+-------------+--------------------------------+--------------------------------+
265 rows in set

Testing

Testing is still pending. This PR focusses on read operations. For write operations, upstream changes need to be done to Parquet-MR. With the main focus on the ParquetWriter class: https://github.com/apache/parquet-mr/blob/4bf606905924896403d25cd6287399cfe7050ce9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java#L25

Resolves #7332
Resolves #3117

@Fokko Fokko marked this pull request as ready for review June 26, 2023 08:05
@Fokko Fokko changed the title Flink: Run without Hadoop Flink 1.19: Run without Hadoop Apr 19, 2024
@@ -69,7 +69,7 @@ private FlinkConfigOptions() {}
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
.booleanType()
.noDefaultValue()
.defaultValue(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required:

py4j.protocol.Py4JJavaError: An error occurred while calling o42.executeSql.
: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configurable
	at java.base/java.lang.ClassLoader.defineClass1(Native Method)
	at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
	at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
	at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at java.base/java.lang.ClassLoader.defineClass1(Native Method)
	at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
	at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
	at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.iceberg.hadoop.Util.usesHadoopFileIO(Util.java:122)
	at org.apache.iceberg.hadoop.Util.mayHaveBlockLocations(Util.java:92)
	at org.apache.iceberg.flink.source.SourceUtil.isLocalityEnabled(SourceUtil.java:43)
	at org.apache.iceberg.flink.source.FlinkSource$Builder.buildFormat(FlinkSource.java:260)
	at org.apache.iceberg.flink.source.FlinkSource$Builder.build(FlinkSource.java:272)
	at org.apache.iceberg.flink.source.IcebergTableSource.createDataStream(IcebergTableSource.java:128)
	at org.apache.iceberg.flink.source.IcebergTableSource.access$200(IcebergTableSource.java:55)
	at org.apache.iceberg.flink.source.IcebergTableSource$1.produceDataStream(IcebergTableSource.java:209)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:163)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:99)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.java:205)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.java:127)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1138)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	... 67 more

@Fokko Fokko force-pushed the fd-make-hadoop-optional branch 9 times, most recently from 16681f2 to dbe6ecb Compare April 22, 2024 15:01
@Fokko Fokko added this to the Iceberg 1.6.0 milestone May 22, 2024
@Fokko Fokko requested review from stevenzwu and pvary June 28, 2024 10:44
@Fokko
Copy link
Contributor Author

Fokko commented Jun 28, 2024

@stevenzwu @pvary do you have time to get some eyes on this one?

@@ -113,7 +114,8 @@ private String metadataFileLocation(Table table) {
}

private FileIO fileIO(Table table) {
if (table.io() instanceof HadoopConfigurable) {
if (HadoopDependency.isHadoopCommonOnClasspath(SerializableTable.class.getClassLoader())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the hadoop is not on the classpath, but the table is HadoopConfigurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No configuration will be passed through.

"There should be a hive-site.xml file under the directory %s",
hiveConfDir);
newConf.addResource(new Path(hiveConfDir, "hive-site.xml"));
public static Object clusterHadoopConf() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better in the HadoopUtil?

@@ -163,7 +163,8 @@ private static TableLoader createTableLoader(
String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");

org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
org.apache.hadoop.conf.Configuration hadoopConf =
(org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So FlinkSQL will still need hadoop on the classpath?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with Peter that this seems problematic, as the return value can be null

@@ -54,7 +54,7 @@ static TableLoader fromHadoopTable(String location) {
return fromHadoopTable(location, FlinkCatalogFactory.clusterHadoopConf());
}

static TableLoader fromHadoopTable(String location, Configuration hadoopConf) {
static TableLoader fromHadoopTable(String location, Object hadoopConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate this part of the change.
We have a public API where type is not defined.
Do we have any better solution for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of the ParquetConfiguration that Parquet-Java did: apache/parquet-java#1141 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems quite a bit of effort to define a IcebergHadoopConfiguration class like parquet-java.

another possible option is to add a new overloaded method with Flink Configuration arg. Flink provides a util method for the conversion.

public class HadoopUtils {
    @SuppressWarnings("deprecation")
    public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) 

Comment on lines +37 to +39
public SerializableConfiguration(Object hadoopConf) {
this.hadoopConf = (Configuration) hadoopConf;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point here? When there's no Hadoop one classpath then it will blow up no matter what, right?
Additionally explicit casts are just brittle. This question applies to all other such places where Object is passed.
In Flink this is solved in a way that Hadoop specific class usages are protected with isHadoopCommonOnClasspath and that's it, works like charm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also interested in the answer to Gabor's question.

also wondering if we can get overload ambiguity from the two constructors?

@@ -69,7 +69,7 @@ private FlinkConfigOptions() {}
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
.booleanType()
.noDefaultValue()
.defaultValue(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwards incompatible change. I'm not sure how widely it's used, but we need to bdo some research and be more vocal about this change, if we decide to go ahead with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location()) from Hadoop module to figure out if locality is enabled for hdfs scheme.

would it work if we add the isHadoopCommonOnClasspath check at the beginning of the Util#mayHaveBlockLocations class in Hadoop module? return false if Hadoop common not on class path?

  public static boolean mayHaveBlockLocations(FileIO io, String location) {
    if (usesHadoopFileIO(io, location)) {
      InputFile inputFile = io.newInputFile(location);
      if (inputFile instanceof HadoopInputFile) {
        String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
        return LOCALITY_WHITELIST_FS.contains(scheme);

      } else {
        return false;
      }
    }

    return false;
  }

@Fokko Fokko removed this from the Iceberg 1.6.0 milestone Jul 2, 2024
@@ -47,4 +52,8 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE
public Configuration get() {
return hadoopConf;
}

public Configuration getClone() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe name this method just as config().

getCone can look like a clone of this SeriazableConfiguration class.

Comment on lines +37 to +39
public SerializableConfiguration(Object hadoopConf) {
this.hadoopConf = (Configuration) hadoopConf;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also interested in the answer to Gabor's question.

also wondering if we can get overload ambiguity from the two constructors?

@@ -163,7 +163,8 @@ private static TableLoader createTableLoader(
String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");

org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
org.apache.hadoop.conf.Configuration hadoopConf =
(org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with Peter that this seems problematic, as the return value can be null

@@ -69,7 +69,7 @@ private FlinkConfigOptions() {}
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO =
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info")
.booleanType()
.noDefaultValue()
.defaultValue(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location()) from Hadoop module to figure out if locality is enabled for hdfs scheme.

would it work if we add the isHadoopCommonOnClasspath check at the beginning of the Util#mayHaveBlockLocations class in Hadoop module? return false if Hadoop common not on class path?

  public static boolean mayHaveBlockLocations(FileIO io, String location) {
    if (usesHadoopFileIO(io, location)) {
      InputFile inputFile = io.newInputFile(location);
      if (inputFile instanceof HadoopInputFile) {
        String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
        return LOCALITY_WHITELIST_FS.contains(scheme);

      } else {
        return false;
      }
    }

    return false;
  }

@@ -54,7 +54,7 @@ static TableLoader fromHadoopTable(String location) {
return fromHadoopTable(location, FlinkCatalogFactory.clusterHadoopConf());
}

static TableLoader fromHadoopTable(String location, Configuration hadoopConf) {
static TableLoader fromHadoopTable(String location, Object hadoopConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems quite a bit of effort to define a IcebergHadoopConfiguration class like parquet-java.

another possible option is to add a new overloaded method with Flink Configuration arg. Flink provides a util method for the conversion.

public class HadoopUtils {
    @SuppressWarnings("deprecation")
    public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) 

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 29, 2024
Copy link

github-actions bot commented Sep 5, 2024

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Sep 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flink: Make Hadoop an optional dependency Flink: Decouple the iceberg integration work from hadoop libraries
5 participants