+ implements PlannerFactory {
+
+ public DelegatePlannerFactory() {
+ super(PlannerModule.getInstance().loadPlannerFactory());
+ }
+
+ @Override
+ public Planner create(Context context) {
+ return delegate.create(context);
+ }
+}
diff --git a/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java b/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
new file mode 100644
index 0000000000000..2630fc9106dd5
--- /dev/null
+++ b/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.loader;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.classloading.ComponentClassLoader;
+import org.apache.flink.core.classloading.SubmoduleClassLoader;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.ExpressionParserFactory;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+/**
+ * Module holder that loads the flink-table-planner module in a separate classpath.
+ *
+ * This loader expects the flink-table-planner jar to be accessible via {@link
+ * ClassLoader#getResource(String)}. It will extract the jar into a temporary directory and create a
+ * new {@link SubmoduleClassLoader} to load the various planner factories from that jar.
+ */
+class PlannerModule {
+
+ /**
+ * The name of the table planner dependency jar, bundled with flink-table-planner-loader module
+ * artifact.
+ */
+ static final String FLINK_TABLE_PLANNER_FAT_JAR = "flink-table-planner.jar";
+
+ static final String HINT_USAGE =
+ "mvn clean package -pl flink-table/flink-table-planner,flink-table/flink-table-planner-loader -DskipTests";
+
+ private final ClassLoader submoduleClassLoader;
+
+ private PlannerModule() {
+ try {
+ final ClassLoader flinkClassLoader = PlannerModule.class.getClassLoader();
+
+ final Path tmpDirectory =
+ Paths.get(ConfigurationUtils.parseTempDirectories(new Configuration())[0]);
+ Files.createDirectories(tmpDirectory);
+ final Path tempFile =
+ Files.createFile(
+ tmpDirectory.resolve(
+ "flink-table-planner_" + UUID.randomUUID() + ".jar"));
+
+ final InputStream resourceStream =
+ flinkClassLoader.getResourceAsStream(FLINK_TABLE_PLANNER_FAT_JAR);
+ if (resourceStream == null) {
+ throw new TableException(
+ String.format(
+ "Flink Table planner could not be found. If this happened while running a test in the IDE, "
+ + "run '%s' on the command-line, "
+ + "or add a test dependency on the flink-table-planner-loader test-jar.",
+ HINT_USAGE));
+ }
+
+ IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
+
+ String[] ownerClassPath =
+ Stream.concat(
+ Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS),
+ Stream.of(
+ // These packages are shipped either by
+ // flink-table-runtime or flink-dist itself
+ "org.codehaus.janino",
+ "org.codehaus.commons",
+ "org.apache.commons.lang3",
+ // Used by org.reflections
+ "javassist"))
+ .toArray(String[]::new);
+ String[] componentClassPath = new String[] {"org.apache.flink"};
+
+ this.submoduleClassLoader =
+ new ComponentClassLoader(
+ new URL[] {tempFile.toUri().toURL()},
+ flinkClassLoader,
+ ownerClassPath,
+ componentClassPath);
+ } catch (IOException e) {
+ throw new TableException(
+ "Could not initialize the table planner components loader.", e);
+ }
+ }
+
+ // Singleton lazy initialization
+
+ private static class PlannerComponentsHolder {
+ private static final PlannerModule INSTANCE = new PlannerModule();
+ }
+
+ public static PlannerModule getInstance() {
+ return PlannerComponentsHolder.INSTANCE;
+ }
+
+ // load methods for various components provided by the planner
+
+ public ExecutorFactory loadExecutorFactory() {
+ return FactoryUtil.discoverFactory(
+ this.submoduleClassLoader,
+ ExecutorFactory.class,
+ ExecutorFactory.DEFAULT_IDENTIFIER);
+ }
+
+ public PlannerFactory loadPlannerFactory() {
+ return FactoryUtil.discoverFactory(
+ this.submoduleClassLoader, PlannerFactory.class, PlannerFactory.DEFAULT_IDENTIFIER);
+ }
+
+ public ExpressionParserFactory loadExpressionParserFactory() {
+ return FactoryUtil.discoverFactory(
+ this.submoduleClassLoader,
+ ExpressionParserFactory.class,
+ ExpressionParserFactory.DEFAULT_IDENTIFIER);
+ }
+}
diff --git a/flink-table/flink-table-planner-loader/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner-loader/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000000..880ba99c48558
--- /dev/null
+++ b/flink-table/flink-table-planner-loader/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.planner.loader.DelegateExecutorFactory
+org.apache.flink.table.planner.loader.DelegateExpressionParserFactory
+org.apache.flink.table.planner.loader.DelegatePlannerFactory
diff --git a/flink-table/flink-table-planner-loader/src/test/java/org/apache/flink/table/planner/loader/LoaderITCase.java b/flink-table/flink-table-planner-loader/src/test/java/org/apache/flink/table/planner/loader/LoaderITCase.java
new file mode 100644
index 0000000000000..52574bb4e9b02
--- /dev/null
+++ b/flink-table/flink-table-planner-loader/src/test/java/org/apache/flink/table/planner/loader/LoaderITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.loader;
+
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.ExpressionParserFactory;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.planner.loader.PlannerModule.FLINK_TABLE_PLANNER_FAT_JAR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for the services loaded through {@link PlannerModule}.
+ *
+ *
This must be an ITCase so that it runs after the 'package' phase of maven. Otherwise, the
+ * flink-table-planner jar will not be available.
+ *
+ *
This test might fail in your IDE if the flink-table-planner-loader module is wrongly configure
+ * to include flink-table-planner in the test classpath.
+ */
+public class LoaderITCase extends TestLogger {
+
+ @Test
+ public void testExecutorFactory() {
+ assertThat(
+ DelegateExecutorFactory.class
+ .getClassLoader()
+ .getResourceAsStream(FLINK_TABLE_PLANNER_FAT_JAR))
+ .isNotNull();
+
+ ExecutorFactory executorFactory =
+ FactoryUtil.discoverFactory(
+ LoaderITCase.class.getClassLoader(),
+ ExecutorFactory.class,
+ ExecutorFactory.DEFAULT_IDENTIFIER);
+
+ assertThat(executorFactory).isNotNull().isInstanceOf(DelegateExecutorFactory.class);
+ assertThat(executorFactory.factoryIdentifier())
+ .isEqualTo(ExecutorFactory.DEFAULT_IDENTIFIER);
+ }
+
+ @Test
+ public void testPlannerFactory() {
+ assertThat(
+ DelegatePlannerFactory.class
+ .getClassLoader()
+ .getResourceAsStream(FLINK_TABLE_PLANNER_FAT_JAR))
+ .isNotNull();
+
+ PlannerFactory plannerFactory =
+ FactoryUtil.discoverFactory(
+ LoaderITCase.class.getClassLoader(),
+ PlannerFactory.class,
+ PlannerFactory.DEFAULT_IDENTIFIER);
+
+ assertThat(plannerFactory).isNotNull().isInstanceOf(DelegatePlannerFactory.class);
+ assertThat(plannerFactory.factoryIdentifier()).isEqualTo(PlannerFactory.DEFAULT_IDENTIFIER);
+ }
+
+ @Test
+ public void testExpressionParserFactory() {
+ assertThat(
+ DelegateExpressionParserFactory.class
+ .getClassLoader()
+ .getResourceAsStream(FLINK_TABLE_PLANNER_FAT_JAR))
+ .isNotNull();
+
+ ExpressionParserFactory expressionParserFactory =
+ FactoryUtil.discoverFactory(
+ LoaderITCase.class.getClassLoader(),
+ ExpressionParserFactory.class,
+ ExpressionParserFactory.DEFAULT_IDENTIFIER);
+
+ assertThat(expressionParserFactory)
+ .isNotNull()
+ .isInstanceOf(DelegateExpressionParserFactory.class);
+ assertThat(expressionParserFactory.factoryIdentifier())
+ .isEqualTo(ExpressionParserFactory.DEFAULT_IDENTIFIER);
+ }
+}
diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml
index d89cfc33c5c33..f57850d351a93 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -39,8 +39,7 @@ under the License.
jar
-
+
com.google.guava
guava
@@ -49,37 +48,28 @@ under the License.
org.codehaus.janino
commons-compiler
-
+
org.codehaus.janino
janino
-
-
+
org.apache.flink
- flink-table-common
+ flink-table-api-java-bridge
${project.version}
-
- org.apache.flink
- flink-table-api-java
- ${project.version}
-
+
org.apache.flink
- flink-table-api-bridge-base
+ flink-scala_${scala.binary.version}
${project.version}
-
- org.apache.flink
- flink-table-api-java-bridge
- ${project.version}
-
+
org.apache.flink
@@ -92,7 +82,6 @@ under the License.
-
org.apache.flink
flink-sql-parser-hive
@@ -105,23 +94,15 @@ under the License.
+
+
org.apache.flink
flink-table-runtime
${project.version}
-
- org.apache.flink
- flink-scala_${scala.binary.version}
- ${project.version}
-
-
-
- org.apache.flink
- flink-streaming-scala_${scala.binary.version}
- ${project.version}
-
+
org.apache.flink
@@ -130,8 +111,10 @@ under the License.
provided
-
+
+
+
org.apache.calcite
calcite-core
@@ -185,6 +168,10 @@ under the License.
org.apache.commons
commons-dbcp2
+
+ org.apache.commons
+ commons-lang3
+
com.fasterxml.jackson.dataformat
jackson-dataformat-yaml
@@ -205,20 +192,32 @@ under the License.
-
- com.ibm.icu
- icu4j
- 67.1
+
+ org.scala-lang.modules
+ scala-parser-combinators_${scala.binary.version}
-
- org.scala-lang.modules
- scala-parser-combinators_${scala.binary.version}
+
+ org.reflections
+ reflections
+ 0.9.10
+ compile
+
+
+ com.google.code.findbugs
+ annotations
+
+
+ com.google.guava
+ guava
+
+
-
+
+
org.apache.flink
flink-test-utils
@@ -226,30 +225,31 @@ under the License.
test
+
+
org.apache.flink
- flink-core
+ flink-table-api-scala_${scala.binary.version}
${project.version}
- test-jar
test
-
org.apache.flink
- flink-table-api-scala_${scala.binary.version}
+ flink-table-api-scala-bridge_${scala.binary.version}
${project.version}
test
org.apache.flink
- flink-table-api-scala-bridge_${scala.binary.version}
+ flink-core
${project.version}
+ test-jar
test
-
+
org.apache.flink
flink-tests
${project.version}
@@ -296,31 +296,13 @@ under the License.
test
-
+
org.apache.flink
flink-connector-files
${project.version}
test
-
-
-
- org.reflections
- reflections
- 0.9.10
-
-
- com.google.code.findbugs
- annotations
-
-
- com.google.guava
- guava
-
-
- compile
-
@@ -356,103 +338,130 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
+
+
+ true
+ ${project.basedir}/target/dependency-reduced-pom.xml
+
+
+ *:*
+
+
+ org-apache-calcite-jdbc.properties
+ common.proto
+ requests.proto
+ responses.proto
+ codegen/**
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ META-INF/services/java.sql.Driver
+ META-INF/versions/11/module-info.class
+
+ LICENSE
+
+
+
+
+
+ org.apache.calcite:*
+ org.apache.calcite.avatica:*
+
+
+ com.esri.geometry:esri-geometry-api
+ com.google.guava:guava
+ com.google.guava:failureaccess
+ commons-codec:commons-codec
+ commons-io:commons-io
+
+
+ org.apache.flink:flink-sql-parser
+ org.apache.flink:flink-sql-parser-hive
+
+
+ org.scala-lang.modules:scala-parser-combinators_${scala.binary.version}
+
+
+ org.reflections:reflections
+
+
+
+
+
+
+
+
+ com.google
+ org.apache.flink.calcite.shaded.com.google
+
+
+ com.jayway
+ org.apache.flink.calcite.shaded.com.jayway
+
+
+ com.fasterxml
+ org.apache.flink.shaded.jackson2.com.fasterxml
+
+
+ org.apache.commons.codec
+ org.apache.flink.calcite.shaded.org.apache.commons.codec
+
+
+ org.apache.commons.io
+ org.apache.flink.calcite.shaded.org.apache.commons.io
+
+
+
+ org.reflections
+ org.apache.flink.table.shaded.org.reflections
+
+
+
+
+ com.ibm.icu
+ org.apache.flink.table.shaded.com.ibm.icu
+
+
+
+
shade-flink
+
+ none
+
+
+
+ shade-loader-bundle
+ package
+
+ shade
+
-
-
- *:*
-
-
- org-apache-calcite-jdbc.properties
- common.proto
- requests.proto
- responses.proto
- codegen/**
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
- META-INF/services/java.sql.Driver
- META-INF/versions/11/module-info.class
-
- LICENSE
-
-
-
+ true
+ loader-bundle
- org.apache.calcite:*
- org.apache.calcite.avatica:*
-
-
- com.esri.geometry:esri-geometry-api
- com.google.guava:guava
- com.google.guava:failureaccess
- commons-codec:commons-codec
- commons-io:commons-io
-
-
- org.apache.flink:flink-sql-parser
- org.apache.flink:flink-sql-parser-hive
-
-
- org.codehaus.janino:*
- org.apache.flink:flink-table-code-splitter
-
-
- com.ibm.icu:icu4j
-
-
- org.scala-lang.modules:scala-parser-combinators_${scala.binary.version}
-
- org.reflections:reflections
+
+ org.scala-lang:*
+ org.apache.flink:flink-scala_${scala.binary.version}
-
-
-
-
-
-
- com.google
- org.apache.flink.calcite.shaded.com.google
-
-
-
- com.jayway
- org.apache.flink.calcite.shaded.com.jayway
-
-
-
- com.fasterxml
- org.apache.flink.shaded.jackson2.com.fasterxml
-
-
- org.apache.commons.codec
- org.apache.flink.calcite.shaded.org.apache.commons.codec
-
-
- org.apache.commons.io
- org.apache.flink.calcite.shaded.org.apache.commons.io
-
-
-
-
-
-
-
- org.reflections
- org.apache.flink.table.shaded.org.reflections
-
-
-
+
+
+
+ shade-distribution
+ package
+
+ shade
+
+
+ false
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index ced64c8bf86e6..34bd234e6d941 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -53,6 +53,8 @@ public static ObjectMapper createObjectMapper(SerdeContext serdeCtx) {
null, // JsonFactory
null, // DefaultSerializerProvider
ctx);
+ mapper.setTypeFactory(
+ mapper.getTypeFactory().withClassLoader(JsonSerdeUtil.class.getClassLoader()));
mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
ctx.setObjectMapper(mapper);
return mapper;
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
index 2b53463f2986c..0696cfe825d2c 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/NOTICE
@@ -16,17 +16,6 @@ This project bundles the following dependencies under the Apache Software Licens
- commons-codec:commons-codec:1.15
- commons-io:commons-io:2.11.0
-This project bundles the following dependencies under the BSD license.
-See bundled license files for details
-
-- org.codehaus.janino:janino:3.0.11
-- org.codehaus.janino:commons-compiler:3.0.11
-
-This project bundles the following dependencies under the ICU license.
-See bundled license files for details
-
-- com.ibm.icu:icu4j:67.1
-
This project bundles the following dependencies under the WTFPL license.
See bundled license files for details
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index a4e735103992e..f18183e69e202 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ReadableConfig
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions}
-import org.apache.flink.table.api.{PlannerType, SqlDialect, TableConfig, TableEnvironment, TableException}
+import org.apache.flink.table.api._
import org.apache.flink.table.catalog._
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.delegation.{Executor, Parser, Planner}
@@ -161,8 +161,8 @@ abstract class PlannerBase(
def createNewParser: Parser = {
val factoryIdentifier = getTableConfig.getSqlDialect.name().toLowerCase
- val parserFactory = FactoryUtil.discoverFactory(Thread.currentThread.getContextClassLoader,
- classOf[ParserFactory], factoryIdentifier)
+ val parserFactory = FactoryUtil.discoverFactory(
+ getClass.getClassLoader, classOf[ParserFactory], factoryIdentifier)
val context = new DefaultParserContext(catalogManager, plannerContext)
parserFactory.create(context)
diff --git a/flink-table/flink-table-runtime/pom.xml b/flink-table/flink-table-runtime/pom.xml
index 0c85dcc7653db..3a116fd8abc38 100644
--- a/flink-table/flink-table-runtime/pom.xml
+++ b/flink-table/flink-table-runtime/pom.xml
@@ -37,9 +37,7 @@ under the License.
jar
-
-
-
+
org.apache.flink
flink-table-common
@@ -64,27 +62,24 @@ under the License.
${project.version}
-
- org.apache.flink
- flink-streaming-java
- ${project.version}
- provided
-
-
+
org.apache.flink
flink-cep
${project.version}
- provided
+
org.codehaus.janino
janino
- ${janino.version}
+
+
+ org.codehaus.janino
+ commons-compiler
-
+
com.jayway.jsonpath
@@ -92,7 +87,7 @@ under the License.
${jsonpath.version}
-
+
org.apache.flink
@@ -143,6 +138,7 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
+
shade-flink
package
@@ -153,9 +149,12 @@ under the License.
com.jayway.jsonpath:json-path
+ org.codehaus.janino:*
+ org.apache.flink:flink-table-code-splitter
+
com.jayway
org.apache.flink.calcite.shaded.com.jayway
diff --git a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
index 33fe52a325b32..0b85bd761b2b7 100644
--- a/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-runtime/src/main/resources/META-INF/NOTICE
@@ -7,3 +7,5 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- com.jayway.jsonpath:json-path:2.6.0
+- org.codehaus.janino:janino:3.0.11
+- org.codehaus.janino:commons-compiler:3.0.11
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/licenses/LICENSE.janino b/flink-table/flink-table-runtime/src/main/resources/META-INF/licenses/LICENSE.janino
similarity index 100%
rename from flink-table/flink-table-planner/src/main/resources/META-INF/licenses/LICENSE.janino
rename to flink-table/flink-table-runtime/src/main/resources/META-INF/licenses/LICENSE.janino
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index 850931ecc692b..b8605d2ed7970 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -39,9 +39,10 @@ under the License.
flink-table-api-bridge-base
flink-table-api-java-bridge
flink-table-api-scala-bridge
+ flink-table-api-java-uber
flink-table-planner
+ flink-table-planner-loader
flink-table-runtime
- flink-table-uber
flink-sql-client
flink-sql-parser
flink-sql-parser-hive
diff --git a/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java b/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java
index 239bdd5bff0c4..1a00dfb538b4b 100644
--- a/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java
+++ b/tools/ci/java-ci-tools/src/main/java/org/apache/flink/tools/ci/suffixcheck/ScalaSuffixChecker.java
@@ -27,6 +27,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -52,7 +53,15 @@ public class ScalaSuffixChecker {
// [INFO] +- org.scala-lang:scala-reflect:jar:2.11.12:test
private static final Pattern scalaSuffixPattern = Pattern.compile("_2.1[0-9]");
- private static final String AKKA_RPC_MODULE_NAME = "flink-rpc-akka";
+ private static final Set EXCLUDED_MODULES =
+ new HashSet<>(
+ Arrays.asList(
+ // we ignore flink-rpc-akka because it is loaded through a separate
+ // class loader
+ "flink-rpc-akka",
+ // we ignore flink-table-planner-loader because it loads the planner
+ // through a different classpath
+ "flink-table-planner-loader"));
public static void main(String[] args) throws IOException {
if (args.length < 2) {
@@ -95,9 +104,7 @@ private static ParseResult parseMavenOutput(final Path path) throws IOException
final Matcher matcher = moduleNamePattern.matcher(line);
if (matcher.matches()) {
final String moduleName = stripScalaSuffix(matcher.group(1));
- // we ignored flink-rpc-akka because it is loaded through a separate class
- // loader
- if (moduleName.equals(AKKA_RPC_MODULE_NAME)) {
+ if (isExcluded(moduleName)) {
continue;
}
LOG.trace("Parsing module '{}'.", moduleName);
@@ -112,12 +119,12 @@ private static ParseResult parseMavenOutput(final Path path) throws IOException
final boolean isTestDependency = line.endsWith(":test");
// we ignored flink-rpc-akka because it is loaded through a separate class
// loader
- final boolean isFlinkAkkaRpc = line.contains(AKKA_RPC_MODULE_NAME);
+ final boolean isExcluded = isExcluded(line);
LOG.trace("\tline:{}", line);
LOG.trace("\t\tdepends-on-scala:{}", dependsOnScala);
LOG.trace("\t\tis-test-dependency:{}", isTestDependency);
- LOG.trace("\t\tis-flink-rpc-akka:{}", isFlinkAkkaRpc);
- if (dependsOnScala && !isTestDependency && !isFlinkAkkaRpc) {
+ LOG.trace("\t\tis-excluded:{}", isExcluded);
+ if (dependsOnScala && !isTestDependency && !isExcluded) {
LOG.trace("\t\tOutbreak detected at {}!", moduleName);
infected = true;
}
@@ -242,6 +249,10 @@ private static Collection checkModules(
return violations;
}
+ private static boolean isExcluded(String line) {
+ return EXCLUDED_MODULES.stream().anyMatch(line::contains);
+ }
+
private static class ParseResult {
private final Set cleanModules;
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 882660994dd67..894977fa3ee5f 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -66,6 +66,7 @@ flink-table/flink-table-api-java-bridge,\
flink-table/flink-table-api-scala-bridge,\
flink-table/flink-sql-client,\
flink-table/flink-table-planner,\
+flink-table/flink-table-planner-loader,\
flink-table/flink-table-runtime,\
flink-table/flink-table-code-splitter"