Skip to content

Commit

Permalink
[FLINK-24270][checkpoint] Refactor the tests related to the VertexFin…
Browse files Browse the repository at this point in the history
…ishedStateChecker.

This closes apache#18196.
  • Loading branch information
gaoyunhaii committed Feb 1, 2022
1 parent fb14d4d commit e54c59e
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand Down Expand Up @@ -199,6 +200,12 @@ public class CheckpointCoordinator {
/** Optional tracker for checkpoint statistics. */
private final CheckpointStatsTracker statsTracker;

private final BiFunction<
Set<ExecutionJobVertex>,
Map<OperatorID, OperatorState>,
VertexFinishedStateChecker>
vertexFinishedStateCheckerFactory;

/** Id of checkpoint for which in-flight data should be ignored on recovery. */
private final long checkpointIdOfIgnoredInFlightData;

Expand All @@ -218,6 +225,7 @@ public class CheckpointCoordinator {
private final ExecutionAttemptMappingProvider attemptMappingProvider;

private boolean baseLocationsForCheckpointInitialized = false;

private boolean forceFullSnapshot;

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -251,7 +259,8 @@ public CheckpointCoordinator(
checkpointPlanCalculator,
attemptMappingProvider,
SystemClock.getInstance(),
statsTracker);
statsTracker,
VertexFinishedStateChecker::new);
}

@VisibleForTesting
Expand All @@ -269,7 +278,12 @@ public CheckpointCoordinator(
CheckpointPlanCalculator checkpointPlanCalculator,
ExecutionAttemptMappingProvider attemptMappingProvider,
Clock clock,
CheckpointStatsTracker statsTracker) {
CheckpointStatsTracker statsTracker,
BiFunction<
Set<ExecutionJobVertex>,
Map<OperatorID, OperatorState>,
VertexFinishedStateChecker>
vertexFinishedStateCheckerFactory) {

// sanity checks
checkNotNull(checkpointStorage);
Expand Down Expand Up @@ -344,6 +358,7 @@ public CheckpointCoordinator(
this.pendingCheckpoints::size,
this.checkpointsCleaner::getNumberOfCheckpointsToClean);
this.statsTracker = checkNotNull(statsTracker, "Statistic tracker can not be null");
this.vertexFinishedStateCheckerFactory = checkNotNull(vertexFinishedStateCheckerFactory);
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1607,7 +1622,7 @@ private OptionalLong restoreLatestCheckpointedStateInternal(

if (checkForPartiallyFinishedOperators) {
VertexFinishedStateChecker vertexFinishedStateChecker =
new VertexFinishedStateChecker(tasks, operatorStates);
vertexFinishedStateCheckerFactory.apply(tasks, operatorStates);
vertexFinishedStateChecker.validateOperatorsFinishedState();
}

Expand Down
Loading

0 comments on commit e54c59e

Please sign in to comment.