Skip to content

Commit

Permalink
[hotfix][runtime] Add job ID to RuntimeContext
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Mar 2, 2021
1 parent 9874f01 commit cb8a8c6
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.connector.jdbc.xa;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -165,6 +167,11 @@ static JdbcXaSinkFunction<TestEntry> buildSink(

static final RuntimeContext TEST_RUNTIME_CONTEXT =
new RuntimeContext() {
@Override
public Optional<JobID> getJobId() {
return Optional.of(new JobID());
}

@Override
public String getTaskName() {
return "test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
Expand All @@ -42,6 +43,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -56,6 +58,13 @@
@Public
public interface RuntimeContext {

/**
* The ID of the current job. Empty if the execution happens outside of any job context (e.g.
* standalone collection executor). The returned ID should NOT be used for any job management
* tasks.
*/
Optional<JobID> getJobId();

/**
* Returns the name of the task in which the UDF runs, as assigned during plan construction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
Expand All @@ -32,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -114,6 +116,11 @@ public <T, C> C getBroadcastVariableWithInitializer(
}
}

@Override
public Optional<JobID> getJobId() {
return Optional.empty();
}

@Override
public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
Expand All @@ -43,6 +44,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -63,6 +65,11 @@ class CepRuntimeContext implements RuntimeContext {
this.runtimeContext = checkNotNull(runtimeContext);
}

@Override
public Optional<JobID> getJobId() {
return runtimeContext.getJobId();
}

@Override
public String getTaskName() {
return runtimeContext.getTaskName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -78,6 +80,11 @@ public SavepointRuntimeContext(RuntimeContext ctx, KeyedStateStore keyedStateSto
this.registeredDescriptors = new ArrayList<>();
}

@Override
public Optional<JobID> getJobId() {
return ctx.getJobId();
}

@Override
public String getTaskName() {
return ctx.getTaskName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.iterative.task;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.aggregators.Aggregator;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;

/** The abstract base class for all tasks able to participate in an iteration. */
Expand Down Expand Up @@ -192,7 +194,8 @@ public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
env.getDistributedCacheEntries(),
this.accumulatorMap,
metrics,
env.getExternalResourceInfoProvider());
env.getExternalResourceInfoProvider(),
env.getJobID());
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -399,15 +402,17 @@ public IterativeRuntimeUdfContext(
Map<String, Future<Path>> cpTasks,
Map<String, Accumulator<?, ?>> accumulatorMap,
MetricGroup metrics,
ExternalResourceInfoProvider externalResourceInfoProvider) {
ExternalResourceInfoProvider externalResourceInfoProvider,
JobID jobID) {
super(
taskInfo,
userCodeClassLoader,
executionConfig,
cpTasks,
accumulatorMap,
metrics,
externalResourceInfoProvider);
externalResourceInfoProvider,
jobID);
}

@Override
Expand All @@ -426,6 +431,11 @@ public <T extends Value> T getPreviousIterationAggregate(String name) {
return (T) getIterationAggregators().getPreviousGlobalAggregate(name);
}

@Override
public Optional<JobID> getJobId() {
return runtimeUdfContext.getJobId();
}

@Override
public <V, A extends Serializable> void addAccumulator(
String name, Accumulator<V, A> newAccumulator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,8 @@ public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) {
env.getDistributedCacheEntries(),
this.accumulatorMap,
metrics,
env.getExternalResourceInfoProvider());
env.getExternalResourceInfoProvider(),
env.getJobID());
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ public DistributedRuntimeUDFContext createRuntimeContext() {
getEnvironment()
.getMetricGroup()
.getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()),
env.getExternalResourceInfoProvider());
env.getExternalResourceInfoProvider(),
env.getJobID());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ public DistributedRuntimeUDFContext createRuntimeContext() {
env.getDistributedCacheEntries(),
env.getAccumulatorRegistry().getUserMap(),
getEnvironment().getMetricGroup().getOrAddOperator(sourceName),
env.getExternalResourceInfoProvider());
env.getExternalResourceInfoProvider(),
env.getJobID());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void setup(
env.getDistributedCacheEntries(),
accumulatorMap,
metrics,
env.getExternalResourceInfoProvider());
env.getExternalResourceInfoProvider(),
env.getJobID());
}

this.executionConfig = executionConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators.util;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
Expand All @@ -36,6 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;

Expand All @@ -47,17 +49,21 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {

private final ExternalResourceInfoProvider externalResourceInfoProvider;

private final JobID jobID;

public DistributedRuntimeUDFContext(
TaskInfo taskInfo,
UserCodeClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks,
Map<String, Accumulator<?, ?>> accumulators,
MetricGroup metrics,
ExternalResourceInfoProvider externalResourceInfoProvider) {
ExternalResourceInfoProvider externalResourceInfoProvider,
JobID jobID) {
super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks, metrics);
this.externalResourceInfoProvider =
Preconditions.checkNotNull(externalResourceInfoProvider);
this.jobID = jobID;
}

@Override
Expand Down Expand Up @@ -108,6 +114,11 @@ public <T, C> C getBroadcastVariableWithInitializer(
}
}

@Override
public Optional<JobID> getJobId() {
return Optional.of(jobID);
}

@Override
public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
return externalResourceInfoProvider.getExternalResourceInfos(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.DoubleCounter;
import org.apache.flink.api.common.accumulators.Histogram;
Expand Down Expand Up @@ -49,6 +50,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -104,6 +106,11 @@ private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
runtimeContext = Preconditions.checkNotNull(context);
}

@Override
public Optional<JobID> getJobId() {
return runtimeContext.getJobId();
}

@Override
public String getTaskName() {
return runtimeContext.getTaskName();
Expand Down Expand Up @@ -290,5 +297,10 @@ public <T extends Value> T getPreviousIterationAggregate(String name) {
throw new UnsupportedOperationException(
"Iteration aggregators are not supported in rich async functions.");
}

@Override
public Optional<JobID> getJobId() {
return iterationRuntimeContext.getJobId();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
Expand Down Expand Up @@ -51,6 +52,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -158,6 +160,11 @@ public TaskManagerRuntimeInfo getTaskManagerRuntimeInfo() {
return taskEnvironment.getTaskManagerInfo();
}

@Override
public Optional<JobID> getJobId() {
return Optional.of(taskEnvironment.getJobID());
}

@Override
public Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName) {
return externalResourceInfoProvider.getExternalResourceInfos(resourceName);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ under the License.
For Hadoop 2.7, the minor Hadoop version supported for flink-shaded-hadoop-2-uber is 2.7.5
-->
<hivemetastore.hadoop.version>2.7.5</hivemetastore.hadoop.version>
<japicmp.referenceVersion>1.12.0</japicmp.referenceVersion>
<japicmp.referenceVersion>1.13.0</japicmp.referenceVersion>
<japicmp.outputDir>tools/japicmp-output</japicmp.outputDir>
<spotless.version>2.4.2</spotless.version>

Expand Down

0 comments on commit cb8a8c6

Please sign in to comment.