-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink 1.19: Run without Hadoop #7369
Conversation
7b88a34
to
4bf96fc
Compare
@@ -69,7 +69,7 @@ private FlinkConfigOptions() {} | |||
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO = | |||
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | |||
.booleanType() | |||
.noDefaultValue() | |||
.defaultValue(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is required:
py4j.protocol.Py4JJavaError: An error occurred while calling o42.executeSql.
: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configurable
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.iceberg.hadoop.Util.usesHadoopFileIO(Util.java:122)
at org.apache.iceberg.hadoop.Util.mayHaveBlockLocations(Util.java:92)
at org.apache.iceberg.flink.source.SourceUtil.isLocalityEnabled(SourceUtil.java:43)
at org.apache.iceberg.flink.source.FlinkSource$Builder.buildFormat(FlinkSource.java:260)
at org.apache.iceberg.flink.source.FlinkSource$Builder.build(FlinkSource.java:272)
at org.apache.iceberg.flink.source.IcebergTableSource.createDataStream(IcebergTableSource.java:128)
at org.apache.iceberg.flink.source.IcebergTableSource.access$200(IcebergTableSource.java:55)
at org.apache.iceberg.flink.source.IcebergTableSource$1.produceDataStream(IcebergTableSource.java:209)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:163)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:99)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.java:205)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit.translateToPlanInternal(StreamExecLimit.java:127)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1138)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 67 more
16681f2
to
dbe6ecb
Compare
dbe6ecb
to
b4d3887
Compare
This reverts commit c088745.
857e208
to
bf64429
Compare
@stevenzwu @pvary do you have time to get some eyes on this one? |
@@ -113,7 +114,8 @@ private String metadataFileLocation(Table table) { | |||
} | |||
|
|||
private FileIO fileIO(Table table) { | |||
if (table.io() instanceof HadoopConfigurable) { | |||
if (HadoopDependency.isHadoopCommonOnClasspath(SerializableTable.class.getClassLoader()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when the hadoop is not on the classpath, but the table is HadoopConfigurable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No configuration will be passed through.
"There should be a hive-site.xml file under the directory %s", | ||
hiveConfDir); | ||
newConf.addResource(new Path(hiveConfDir, "hive-site.xml")); | ||
public static Object clusterHadoopConf() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this better in the HadoopUtil
?
@@ -163,7 +163,8 @@ private static TableLoader createTableLoader( | |||
String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); | |||
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); | |||
|
|||
org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | |||
org.apache.hadoop.conf.Configuration hadoopConf = | |||
(org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So FlinkSQL will still need hadoop on the classpath?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with Peter that this seems problematic, as the return value can be null
@@ -54,7 +54,7 @@ static TableLoader fromHadoopTable(String location) { | |||
return fromHadoopTable(location, FlinkCatalogFactory.clusterHadoopConf()); | |||
} | |||
|
|||
static TableLoader fromHadoopTable(String location, Configuration hadoopConf) { | |||
static TableLoader fromHadoopTable(String location, Object hadoopConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate this part of the change.
We have a public API where type is not defined.
Do we have any better solution for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of the ParquetConfiguration
that Parquet-Java did: apache/parquet-java#1141 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems quite a bit of effort to define a IcebergHadoopConfiguration
class like parquet-java.
another possible option is to add a new overloaded method with Flink Configuration
arg. Flink provides a util method for the conversion.
public class HadoopUtils {
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration)
core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
Show resolved
Hide resolved
public SerializableConfiguration(Object hadoopConf) { | ||
this.hadoopConf = (Configuration) hadoopConf; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point here? When there's no Hadoop one classpath then it will blow up no matter what, right?
Additionally explicit casts are just brittle. This question applies to all other such places where Object
is passed.
In Flink this is solved in a way that Hadoop specific class usages are protected with isHadoopCommonOnClasspath
and that's it, works like charm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also interested in the answer to Gabor's question.
also wondering if we can get overload ambiguity from the two constructors?
@@ -69,7 +69,7 @@ private FlinkConfigOptions() {} | |||
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO = | |||
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | |||
.booleanType() | |||
.noDefaultValue() | |||
.defaultValue(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a backwards incompatible change. I'm not sure how widely it's used, but we need to bdo some research and be more vocal about this change, if we decide to go ahead with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location())
from Hadoop module to figure out if locality is enabled for hdfs
scheme.
would it work if we add the isHadoopCommonOnClasspath
check at the beginning of the Util#mayHaveBlockLocations
class in Hadoop module? return false if Hadoop common not on class path?
public static boolean mayHaveBlockLocations(FileIO io, String location) {
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
return LOCALITY_WHITELIST_FS.contains(scheme);
} else {
return false;
}
}
return false;
}
@@ -47,4 +52,8 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE | |||
public Configuration get() { | |||
return hadoopConf; | |||
} | |||
|
|||
public Configuration getClone() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe name this method just as config()
.
getCone
can look like a clone of this SeriazableConfiguration
class.
public SerializableConfiguration(Object hadoopConf) { | ||
this.hadoopConf = (Configuration) hadoopConf; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also interested in the answer to Gabor's question.
also wondering if we can get overload ambiguity from the two constructors?
@@ -163,7 +163,8 @@ private static TableLoader createTableLoader( | |||
String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName); | |||
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); | |||
|
|||
org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf(); | |||
org.apache.hadoop.conf.Configuration hadoopConf = | |||
(org.apache.hadoop.conf.Configuration) FlinkCatalogFactory.clusterHadoopConf(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with Peter that this seems problematic, as the return value can be null
@@ -69,7 +69,7 @@ private FlinkConfigOptions() {} | |||
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_EXPOSE_SPLIT_LOCALITY_INFO = | |||
ConfigOptions.key("table.exec.iceberg.expose-split-locality-info") | |||
.booleanType() | |||
.noDefaultValue() | |||
.defaultValue(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @pvary that this changes the default behavior, which calls Util.mayHaveBlockLocations(table.io(), table.location())
from Hadoop module to figure out if locality is enabled for hdfs
scheme.
would it work if we add the isHadoopCommonOnClasspath
check at the beginning of the Util#mayHaveBlockLocations
class in Hadoop module? return false if Hadoop common not on class path?
public static boolean mayHaveBlockLocations(FileIO io, String location) {
if (usesHadoopFileIO(io, location)) {
InputFile inputFile = io.newInputFile(location);
if (inputFile instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) inputFile).getFileSystem().getScheme();
return LOCALITY_WHITELIST_FS.contains(scheme);
} else {
return false;
}
}
return false;
}
@@ -54,7 +54,7 @@ static TableLoader fromHadoopTable(String location) { | |||
return fromHadoopTable(location, FlinkCatalogFactory.clusterHadoopConf()); | |||
} | |||
|
|||
static TableLoader fromHadoopTable(String location, Configuration hadoopConf) { | |||
static TableLoader fromHadoopTable(String location, Object hadoopConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems quite a bit of effort to define a IcebergHadoopConfiguration
class like parquet-java.
another possible option is to add a new overloaded method with Flink Configuration
arg. Flink provides a util method for the conversion.
public class HadoopUtils {
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration)
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Allow Flink to run without Hadoop
This PR aims to remove Hadoop's
Configuration
class from the main code path, so we can also run Flink without having the Hadoop JARs on the Java Classpath.Testing
Testing is still pending. This PR focusses on read operations. For write operations, upstream changes need to be done to Parquet-MR. With the main focus on the ParquetWriter class: https://github.com/apache/parquet-mr/blob/4bf606905924896403d25cd6287399cfe7050ce9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java#L25
Resolves #7332
Resolves #3117