From cb8a8c6925d8fbf34c64bea417606cac05e5b5d1 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Mon, 1 Mar 2021 15:46:38 +0100 Subject: [PATCH] [hotfix][runtime] Add job ID to RuntimeContext --- .../connector/jdbc/xa/JdbcXaSinkTestBase.java | 7 +++++++ .../api/common/functions/RuntimeContext.java | 9 +++++++++ .../common/functions/util/RuntimeUDFContext.java | 7 +++++++ .../flink/cep/operator/CepRuntimeContext.java | 7 +++++++ .../api/runtime/SavepointRuntimeContext.java | 7 +++++++ .../iterative/task/AbstractIterativeTask.java | 16 +++++++++++++--- .../flink/runtime/operators/BatchTask.java | 3 ++- .../flink/runtime/operators/DataSinkTask.java | 3 ++- .../flink/runtime/operators/DataSourceTask.java | 3 ++- .../operators/chaining/ChainedDriver.java | 3 ++- .../util/DistributedRuntimeUDFContext.java | 13 ++++++++++++- .../api/functions/async/RichAsyncFunction.java | 12 ++++++++++++ .../api/operators/StreamingRuntimeContext.java | 7 +++++++ pom.xml | 2 +- 14 files changed, 90 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java index 2ab0b06baa0f0..edb3a9fa07645 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.jdbc.xa; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.Histogram; @@ -62,6 +63,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -165,6 +167,11 @@ static JdbcXaSinkFunction buildSink( static final RuntimeContext TEST_RUNTIME_CONTEXT = new RuntimeContext() { + @Override + public Optional getJobId() { + return Optional.of(new JobID()); + } + @Override public String getTaskName() { return "test"; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 06c10171d4550..fc6f1fcd409fd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.Histogram; @@ -42,6 +43,7 @@ import java.io.Serializable; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -56,6 +58,13 @@ @Public public interface RuntimeContext { + /** + * The ID of the current job. Empty if the execution happens outside of any job context (e.g. + * standalone collection executor). The returned ID should NOT be used for any job management + * tasks. + */ + Optional getJobId(); + /** * Returns the name of the task in which the UDF runs, as assigned during plan construction. * diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java index ffa5c2fe4d527..8823ede0ed19c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.externalresource.ExternalResourceInfo; @@ -32,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; @@ -114,6 +116,11 @@ public C getBroadcastVariableWithInitializer( } } + @Override + public Optional getJobId() { + return Optional.empty(); + } + @Override public Set getExternalResourceInfos(String resourceName) { throw new UnsupportedOperationException( diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java index 476fa1cc94359..e88a503bc34fb 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.Histogram; @@ -43,6 +44,7 @@ import java.io.Serializable; import java.util.List; +import java.util.Optional; import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,6 +65,11 @@ class CepRuntimeContext implements RuntimeContext { this.runtimeContext = checkNotNull(runtimeContext); } + @Override + public Optional getJobId() { + return runtimeContext.getJobId(); + } + @Override public String getTaskName() { return runtimeContext.getTaskName(); diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java index c197a92382b49..df20cd35d8fe0 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointRuntimeContext.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.Histogram; @@ -48,6 +49,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -78,6 +80,11 @@ public SavepointRuntimeContext(RuntimeContext ctx, KeyedStateStore keyedStateSto this.registeredDescriptors = new ArrayList<>(); } + @Override + public Optional getJobId() { + return ctx.getJobId(); + } + @Override public String getTaskName() { return ctx.getTaskName(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java index 35dac5ebf80d9..bf434186aab5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.iterative.task; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.aggregators.Aggregator; @@ -61,6 +62,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Future; /** The abstract base class for all tasks able to participate in an iteration. */ @@ -192,7 +194,8 @@ public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) { env.getDistributedCacheEntries(), this.accumulatorMap, metrics, - env.getExternalResourceInfoProvider()); + env.getExternalResourceInfoProvider(), + env.getJobID()); } // -------------------------------------------------------------------------------------------- @@ -399,7 +402,8 @@ public IterativeRuntimeUdfContext( Map> cpTasks, Map> accumulatorMap, MetricGroup metrics, - ExternalResourceInfoProvider externalResourceInfoProvider) { + ExternalResourceInfoProvider externalResourceInfoProvider, + JobID jobID) { super( taskInfo, userCodeClassLoader, @@ -407,7 +411,8 @@ public IterativeRuntimeUdfContext( cpTasks, accumulatorMap, metrics, - externalResourceInfoProvider); + externalResourceInfoProvider, + jobID); } @Override @@ -426,6 +431,11 @@ public T getPreviousIterationAggregate(String name) { return (T) getIterationAggregators().getPreviousGlobalAggregate(name); } + @Override + public Optional getJobId() { + return runtimeUdfContext.getJobId(); + } + @Override public void addAccumulator( String name, Accumulator newAccumulator) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index aa5fe8c982a17..a282ec1d4b7c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -1144,7 +1144,8 @@ public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) { env.getDistributedCacheEntries(), this.accumulatorMap, metrics, - env.getExternalResourceInfoProvider()); + env.getExternalResourceInfoProvider(), + env.getJobID()); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index af5ef9079c4fa..9f3a427c3c190 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -445,6 +445,7 @@ public DistributedRuntimeUDFContext createRuntimeContext() { getEnvironment() .getMetricGroup() .getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()), - env.getExternalResourceInfoProvider()); + env.getExternalResourceInfoProvider(), + env.getJobID()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 2d54b53daf5d1..eb05e73383727 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -433,6 +433,7 @@ public DistributedRuntimeUDFContext createRuntimeContext() { env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), getEnvironment().getMetricGroup().getOrAddOperator(sourceName), - env.getExternalResourceInfoProvider()); + env.getExternalResourceInfoProvider(), + env.getJobID()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 7a3df08ccc8df..9e0e76d7d30e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -91,7 +91,8 @@ public void setup( env.getDistributedCacheEntries(), accumulatorMap, metrics, - env.getExternalResourceInfoProvider()); + env.getExternalResourceInfoProvider(), + env.getJobID()); } this.executionConfig = executionConfig; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java index 38c3785ea622c..32bcee42d93ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators.util; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.externalresource.ExternalResourceInfo; @@ -36,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; @@ -47,6 +49,8 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { private final ExternalResourceInfoProvider externalResourceInfoProvider; + private final JobID jobID; + public DistributedRuntimeUDFContext( TaskInfo taskInfo, UserCodeClassLoader userCodeClassLoader, @@ -54,10 +58,12 @@ public DistributedRuntimeUDFContext( Map> cpTasks, Map> accumulators, MetricGroup metrics, - ExternalResourceInfoProvider externalResourceInfoProvider) { + ExternalResourceInfoProvider externalResourceInfoProvider, + JobID jobID) { super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics); this.externalResourceInfoProvider = Preconditions.checkNotNull(externalResourceInfoProvider); + this.jobID = jobID; } @Override @@ -108,6 +114,11 @@ public C getBroadcastVariableWithInitializer( } } + @Override + public Optional getJobId() { + return Optional.of(jobID); + } + @Override public Set getExternalResourceInfos(String resourceName) { return externalResourceInfoProvider.getExternalResourceInfos(resourceName); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java index 6e5ffbe1411f6..3ec6cecec491e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.Histogram; @@ -49,6 +50,7 @@ import java.io.Serializable; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -104,6 +106,11 @@ private static class RichAsyncFunctionRuntimeContext implements RuntimeContext { runtimeContext = Preconditions.checkNotNull(context); } + @Override + public Optional getJobId() { + return runtimeContext.getJobId(); + } + @Override public String getTaskName() { return runtimeContext.getTaskName(); @@ -290,5 +297,10 @@ public T getPreviousIterationAggregate(String name) { throw new UnsupportedOperationException( "Iteration aggregators are not supported in rich async functions."); } + + @Override + public Optional getJobId() { + return iterationRuntimeContext.getJobId(); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 681e79783e84c..d282621406fe4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.externalresource.ExternalResourceInfo; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; @@ -51,6 +52,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -158,6 +160,11 @@ public TaskManagerRuntimeInfo getTaskManagerRuntimeInfo() { return taskEnvironment.getTaskManagerInfo(); } + @Override + public Optional getJobId() { + return Optional.of(taskEnvironment.getJobID()); + } + @Override public Set getExternalResourceInfos(String resourceName) { return externalResourceInfoProvider.getExternalResourceInfos(resourceName); diff --git a/pom.xml b/pom.xml index 3e5c120875e8c..f9ee93bd20fce 100644 --- a/pom.xml +++ b/pom.xml @@ -154,7 +154,7 @@ under the License. For Hadoop 2.7, the minor Hadoop version supported for flink-shaded-hadoop-2-uber is 2.7.5 --> 2.7.5 - 1.12.0 + 1.13.0 tools/japicmp-output 2.4.2