Skip to content

Commit

Permalink
[FLINK-25831] Retrieve the latest non-null prior property from Execut…
Browse files Browse the repository at this point in the history
…ionVertex

In order to not lose information about prior Executions, this commit iterates over all
prior executions in the ExecutionVertex in order to find the first non-null prior location
and allocation.

This closes apache#18526.
  • Loading branch information
tillrohrmann committed Jan 27, 2022
1 parent b89a31f commit da29180
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -283,15 +284,30 @@ public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
}
}

public ArchivedExecution getLatestPriorExecution() {
synchronized (priorExecutions) {
final int size = priorExecutions.size();
if (size > 0) {
return priorExecutions.get(size - 1);
} else {
return null;
/**
* Gets the latest property from a prior execution that is not null.
*
* @param extractor defining the property to extract
* @param <T> type of the property
* @return Optional containing the latest property if it exists; otherwise {@code
* Optional.empty()}.
*/
private <T> Optional<T> getLatestPriorProperty(Function<ArchivedExecution, T> extractor) {
int index = priorExecutions.size() - 1;

while (index >= 0 && !priorExecutions.isDroppedIndex(index)) {
final ArchivedExecution archivedExecution = priorExecutions.get(index);

final T extractedValue = extractor.apply(archivedExecution);

if (extractedValue != null) {
return Optional.of(extractedValue);
}

index -= 1;
}

return Optional.empty();
}

/**
Expand All @@ -300,16 +316,12 @@ public ArchivedExecution getLatestPriorExecution() {
*
* @return The latest prior execution location, or null, if there is none, yet.
*/
public TaskManagerLocation getLatestPriorLocation() {
ArchivedExecution latestPriorExecution = getLatestPriorExecution();
return latestPriorExecution != null
? latestPriorExecution.getAssignedResourceLocation()
: null;
public Optional<TaskManagerLocation> findLatestPriorLocation() {
return getLatestPriorProperty(ArchivedExecution::getAssignedResourceLocation);
}

public AllocationID getLatestPriorAllocation() {
ArchivedExecution latestPriorExecution = getLatestPriorExecution();
return latestPriorExecution != null ? latestPriorExecution.getAssignedAllocationID() : null;
public Optional<AllocationID> findLatestPriorAllocation() {
return getLatestPriorProperty(ArchivedExecution::getAssignedAllocationID);
}

EvictingBoundedList<ArchivedExecution> getCopyOfPriorExecutionsList() {
Expand Down Expand Up @@ -345,7 +357,7 @@ public Optional<TaskManagerLocation> getPreferredLocationBasedOnState() {
// only restore to same execution if it has state
if (currentExecution.getTaskRestore() != null
&& currentExecution.getTaskRestore().getTaskStateSnapshot().hasState()) {
return Optional.ofNullable(getLatestPriorLocation());
return findLatestPriorLocation();
}

return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,9 @@ public ResourceProfile getResourceProfile(final ExecutionVertexID executionVerte
}

@Override
public AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
return getExecutionVertex(executionVertexId).getLatestPriorAllocation();
public Optional<AllocationID> findPriorAllocationId(
final ExecutionVertexID executionVertexId) {
return getExecutionVertex(executionVertexId).findLatestPriorAllocation();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;

import java.util.Optional;
import java.util.Set;

/** Context for slot allocation. */
Expand All @@ -44,9 +45,10 @@ interface ExecutionSlotAllocationContext extends InputsLocationsRetriever, State
* Returns prior allocation id for an execution vertex.
*
* @param executionVertexId id of the execution vertex
* @return prior allocation id for the given execution vertex
* @return prior allocation id for the given execution vertex if it exists; otherwise {@code
* Optional.empty()}
*/
AllocationID getPriorAllocationId(ExecutionVertexID executionVertexId);
Optional<AllocationID> findPriorAllocationId(ExecutionVertexID executionVertexId);

/**
* Returns the scheduling topology containing all execution vertices and edges.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -38,13 +39,13 @@ class MergingSharedSlotProfileRetrieverFactory

private final SyncPreferredLocationsRetriever preferredLocationsRetriever;

private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
private final Function<ExecutionVertexID, Optional<AllocationID>> priorAllocationIdRetriever;

private final Supplier<Set<AllocationID>> reservedAllocationIdsRetriever;

MergingSharedSlotProfileRetrieverFactory(
SyncPreferredLocationsRetriever preferredLocationsRetriever,
Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
Function<ExecutionVertexID, Optional<AllocationID>> priorAllocationIdRetriever,
Supplier<Set<AllocationID>> reservedAllocationIdsRetriever) {
this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever);
this.priorAllocationIdRetriever = Preconditions.checkNotNull(priorAllocationIdRetriever);
Expand Down Expand Up @@ -98,7 +99,7 @@ public SlotProfile getSlotProfile(
Collection<AllocationID> priorAllocations = new HashSet<>();
Collection<TaskManagerLocation> preferredLocations = new ArrayList<>();
for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {
priorAllocations.add(priorAllocationIdRetriever.apply(execution));
priorAllocationIdRetriever.apply(execution).ifPresent(priorAllocations::add);
preferredLocations.addAll(
preferredLocationsRetriever.getPreferredLocations(
execution, producersToIgnore));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContex
SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory =
new MergingSharedSlotProfileRetrieverFactory(
preferredLocationsRetriever,
context::getPriorAllocationId,
context::findPriorAllocationId,
context::getReservedAllocations);
return new SlotSharingExecutionSlotAllocator(
slotProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
Expand All @@ -29,6 +30,9 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -95,4 +99,50 @@ public void testResetForNewExecutionReleasesPartitions() throws Exception {

assertThat(releasePartitionsFuture.get()).contains(resultPartitionID);
}

@Test
public void testFindLatestAllocationIgnoresFailedAttempts() throws Exception {
final JobVertex source = ExecutionGraphTestUtils.createNoOpVertex(1);
final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source);
final TestingPhysicalSlotProvider withLimitedAmountOfPhysicalSlots =
TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
final SchedulerBase scheduler =
SchedulerTestingUtils.newSchedulerBuilder(
jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread())
.setExecutionSlotAllocatorFactory(
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
withLimitedAmountOfPhysicalSlots))
.build();

scheduler.startScheduling();

final ExecutionJobVertex sourceExecutionJobVertex =
scheduler.getExecutionJobVertex(source.getID());

final ExecutionVertex sourceExecutionVertex = sourceExecutionJobVertex.getTaskVertices()[0];
final Execution firstExecution = sourceExecutionVertex.getCurrentExecutionAttempt();

final TestingPhysicalSlot physicalSlot =
withLimitedAmountOfPhysicalSlots.getFirstResponseOrFail().join();
final AllocationID allocationId = physicalSlot.getAllocationId();
final TaskManagerLocation taskManagerLocation = physicalSlot.getTaskManagerLocation();

cancelExecution(firstExecution);
sourceExecutionVertex.resetForNewExecution();

assertThat(sourceExecutionVertex.findLatestPriorAllocation()).hasValue(allocationId);
assertThat(sourceExecutionVertex.findLatestPriorLocation()).hasValue(taskManagerLocation);

final Execution secondExecution = sourceExecutionVertex.getCurrentExecutionAttempt();
cancelExecution(secondExecution);
sourceExecutionVertex.resetForNewExecution();

assertThat(sourceExecutionVertex.findLatestPriorAllocation()).hasValue(allocationId);
assertThat(sourceExecutionVertex.findLatestPriorLocation()).hasValue(taskManagerLocation);
}

private void cancelExecution(Execution execution) {
execution.cancel();
execution.completeCancelling();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -62,7 +63,7 @@ public void testGetEmptySlotProfile() throws ExecutionException, InterruptedExce
SharedSlotProfileRetriever sharedSlotProfileRetriever =
new MergingSharedSlotProfileRetrieverFactory(
EMPTY_PREFERRED_LOCATIONS_RETRIEVER,
executionVertexID -> new AllocationID(),
executionVertexID -> Optional.of(new AllocationID()),
() -> Collections.emptySet())
.createFromBulk(Collections.emptySet());

Expand Down Expand Up @@ -203,8 +204,9 @@ private static SlotProfile getSlotProfile(
new MergingSharedSlotProfileRetrieverFactory(
preferredLocationsRetriever,
executionVertexID ->
prevAllocationIDs.get(
executions.indexOf(executionVertexID)),
Optional.ofNullable(
prevAllocationIDs.get(
executions.indexOf(executionVertexID))),
() -> new HashSet<>(reservedAllocationIds))
.createFromBulk(new HashSet<>(executions));

Expand Down

0 comments on commit da29180

Please sign in to comment.