Skip to content

Commit

Permalink
[FLINK-27140][coordination] Write job result in ioExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 14, 2022
1 parent ad2e3af commit 8d00622
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,10 +612,12 @@ private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionTy
return handleJobManagerRunnerResult(
jobManagerRunnerResult, executionType);
} else {
return jobManagerRunnerFailed(jobId, throwable);
return CompletableFuture.completedFuture(
jobManagerRunnerFailed(jobId, throwable));
}
},
getMainThreadExecutor());
getMainThreadExecutor())
.thenCompose(Function.identity());

final CompletableFuture<Void> jobTerminationFuture =
cleanupJobStateFuture.thenCompose(
Expand Down Expand Up @@ -644,13 +646,14 @@ private Void logCleanupErrorWarning(JobID jobId, Throwable cleanupError) {
return null;
}

private CleanupJobState handleJobManagerRunnerResult(
private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(
JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) {
if (jobManagerRunnerResult.isInitializationFailure()
&& executionType == ExecutionType.RECOVERY) {
return jobManagerRunnerFailed(
jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
jobManagerRunnerResult.getInitializationFailure());
return CompletableFuture.completedFuture(
jobManagerRunnerFailed(
jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
jobManagerRunnerResult.getInitializationFailure()));
}
return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
}
Expand Down Expand Up @@ -985,7 +988,7 @@ private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJo
case GLOBAL:
return globalResourceCleaner
.cleanupAsync(jobId)
.thenRun(() -> markJobAsClean(jobId));
.thenRunAsync(() -> markJobAsClean(jobId), ioExecutor);
default:
throw new IllegalStateException("Invalid cleanup state: " + cleanupJobState);
}
Expand Down Expand Up @@ -1030,7 +1033,8 @@ protected void onFatalError(Throwable throwable) {
fatalErrorHandler.onFatalError(throwable);
}

protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) {
protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
ExecutionGraphInfo executionGraphInfo) {
final ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
Expand Down Expand Up @@ -1062,35 +1066,50 @@ protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGr

archiveExecutionGraph(executionGraphInfo);

if (terminalJobStatus.isGloballyTerminalState()) {
final JobID jobId = executionGraphInfo.getJobId();
try {
if (jobResultStore.hasCleanJobResultEntry(jobId)) {
log.warn(
"Job {} is already marked as clean but clean up was triggered again.",
jobId);
} else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
jobResultStore.createDirtyResult(
new JobResultEntry(
JobResult.createFrom(
executionGraphInfo.getArchivedExecutionGraph())));
log.info(
"Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
jobId);
}
} catch (IOException e) {
fatalErrorHandler.onFatalError(
new FlinkException(
String.format(
"The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
jobId),
e));
}
if (!terminalJobStatus.isGloballyTerminalState()) {
return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
}

return terminalJobStatus.isGloballyTerminalState()
? CleanupJobState.GLOBAL
: CleanupJobState.LOCAL;
final CompletableFuture<Void> writeFuture = new CompletableFuture<>();
final JobID jobId = executionGraphInfo.getJobId();

ioExecutor.execute(
() -> {
try {
if (jobResultStore.hasCleanJobResultEntry(jobId)) {
log.warn(
"Job {} is already marked as clean but clean up was triggered again.",
jobId);
} else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
jobResultStore.createDirtyResult(
new JobResultEntry(
JobResult.createFrom(
executionGraphInfo
.getArchivedExecutionGraph())));
log.info(
"Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.",
jobId);
}
} catch (IOException e) {
writeFuture.completeExceptionally(e);
return;
}
writeFuture.complete(null);
});

return writeFuture.handleAsync(
(ignored, error) -> {
if (error != null) {
fatalErrorHandler.onFatalError(
new FlinkException(
String.format(
"The job %s couldn't be marked as pre-cleanup finished in JobResultStore.",
executionGraphInfo.getJobId()),
error));
}
return CleanupJobState.GLOBAL;
},
getMainThreadExecutor());
}

private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,33 @@ public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
}

@Override
protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) {
protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
ExecutionGraphInfo executionGraphInfo) {
final ArchivedExecutionGraph archivedExecutionGraph =
executionGraphInfo.getArchivedExecutionGraph();
final CleanupJobState cleanupHAState = super.jobReachedTerminalState(executionGraphInfo);

JobStatus jobStatus =
Objects.requireNonNull(
archivedExecutionGraph.getState(), "JobStatus should not be null here.");
if (jobStatus.isGloballyTerminalState()
&& (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) {
// shut down if job is cancelled or we don't have to wait for the execution result
// retrieval
log.info(
"Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}",
jobStatus,
jobCancelled,
executionMode);
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
}
final CompletableFuture<CleanupJobState> cleanupHAState =
super.jobReachedTerminalState(executionGraphInfo);

return cleanupHAState.thenApply(
cleanupJobState -> {
JobStatus jobStatus =
Objects.requireNonNull(
archivedExecutionGraph.getState(),
"JobStatus should not be null here.");
if (jobStatus.isGloballyTerminalState()
&& (jobCancelled
|| executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) {
// shut down if job is cancelled or we don't have to wait for the execution
// result retrieval
log.info(
"Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}",
jobStatus,
jobCancelled,
executionMode);
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
}

return cleanupHAState;
return cleanupJobState;
});
}
}

0 comments on commit 8d00622

Please sign in to comment.