Skip to content

Commit

Permalink
[FLINK-25402] Introduce ClusterOptions.PROCESS_WORKING_DIR_BASE for c…
Browse files Browse the repository at this point in the history
…onfiguring a process directory

This commit introduces the configuration option to configure a working directory for the JobManager
and TaskManager process. The resulting working directory will be <WORKING_DIR_BASE>/jm_<RESOURCE_ID>
and <WORKING_DIR_BASE>/tm_<RESOURCE_ID> respectively.
  • Loading branch information
tillrohrmann committed Jan 19, 2022
1 parent f107710 commit 09deea1
Show file tree
Hide file tree
Showing 16 changed files with 746 additions and 9 deletions.
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/cluster_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,23 @@
<td>Boolean</td>
<td>Whether to convert all PIPELINE edges to BLOCKING when apply fine-grained resource management in batch jobs.</td>
</tr>
<tr>
<td><h5>process.jobmanager.working-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Working directory for Flink JobManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to <code class="highlighter-rouge">process.working-dir</code>.</td>
</tr>
<tr>
<td><h5>process.taskmanager.working-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Working directory for Flink TaskManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to <code class="highlighter-rouge">process.working-dir</code>.</td>
</tr>
<tr>
<td><h5>process.working-dir</h5></td>
<td style="word-wrap: break-word;">io.tmp.dirs</td>
<td>String</td>
<td>Working directory for Flink processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to a randomly picked temporary directory defined via <code class="highlighter-rouge">io.tmp.dirs</code>.</td>
</tr>
</tbody>
</table>
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/expert_cluster_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,23 @@
<td><p>Enum</p></td>
<td>Defines whether cluster will handle any uncaught exceptions by just logging them (LOG mode), or by failing job (FAIL mode)<br /><br />Possible values:<ul><li>"LOG"</li><li>"FAIL"</li></ul></td>
</tr>
<tr>
<td><h5>process.jobmanager.working-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Working directory for Flink JobManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to <code class="highlighter-rouge">process.working-dir</code>.</td>
</tr>
<tr>
<td><h5>process.taskmanager.working-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Working directory for Flink TaskManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to <code class="highlighter-rouge">process.working-dir</code>.</td>
</tr>
<tr>
<td><h5>process.working-dir</h5></td>
<td style="word-wrap: break-word;">io.tmp.dirs</td>
<td>String</td>
<td>Working directory for Flink processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to a randomly picked temporary directory defined via <code class="highlighter-rouge">io.tmp.dirs</code>.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ private static List<URL> getClasspath(
}

@Override
protected void cleanupDirectories() throws IOException {
protected void cleanupDirectories(ShutdownBehaviour shutdownBehaviour) throws IOException {
// Close the packaged program explicitly to clean up temporary jars.
program.close();
super.cleanupDirectories();
super.cleanupDirectories(shutdownBehaviour);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.configuration.description.TextElement;

import static org.apache.flink.configuration.ClusterOptions.UserSystemExitMode.THROW;
import static org.apache.flink.configuration.ConfigOptions.key;
Expand Down Expand Up @@ -157,6 +158,47 @@ public class ClusterOptions {
UncaughtExceptionHandleMode.LOG.name(),
UncaughtExceptionHandleMode.FAIL.name()));

@Documentation.OverrideDefault("io.tmp.dirs")
@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
ConfigOptions.key("process.working-dir")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Working directory for Flink processes. "
+ "The working directory can be used to store information that can be used upon process recovery. "
+ "If not configured, then it will default to a randomly picked temporary directory defined via %s.",
TextElement.code(CoreOptions.TMP_DIRS.key()))
.build());

@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
public static final ConfigOption<String> JOB_MANAGER_PROCESS_WORKING_DIR_BASE =
ConfigOptions.key("process.jobmanager.working-dir")
.stringType()
.noDefaultValue()
.withFallbackKeys(PROCESS_WORKING_DIR_BASE.key())
.withDescription(
Description.builder()
.text(
"Working directory for Flink JobManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to %s.",
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
.build());

@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
public static final ConfigOption<String> TASK_MANAGER_PROCESS_WORKING_DIR_BASE =
ConfigOptions.key("process.taskmanager.working-dir")
.stringType()
.noDefaultValue()
.withFallbackKeys(PROCESS_WORKING_DIR_BASE.key())
.withDescription(
Description.builder()
.text(
"Working directory for Flink TaskManager processes. The working directory can be used to store information that can be used upon process recovery. If not configured, then it will default to %s.",
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
.build());

public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) {
if (isAdaptiveSchedulerEnabled(configuration) || isReactiveModeEnabled(configuration)) {
return JobManagerOptions.SchedulerType.Adaptive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;

import javax.annotation.Nonnull;
Expand All @@ -34,6 +35,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.MetricOptions.SYSTEM_RESOURCE_METRICS;
Expand Down Expand Up @@ -73,6 +75,27 @@ public static String[] parseTempDirectories(Configuration configuration) {
return splitPaths(configuration.getString(CoreOptions.TMP_DIRS));
}

/**
* Picks a temporary directory randomly from the given configuration.
*
* @param configuration to extract the temp directory from
* @return a randomly picked temporary directory
*/
@Nonnull
public static File getRandomTempDirectory(Configuration configuration) {
final String[] tmpDirectories = parseTempDirectories(configuration);

Preconditions.checkState(
tmpDirectories.length > 0,
String.format(
"No temporary directory has been specified for %s",
CoreOptions.TMP_DIRS.key()));

final int randomIndex = ThreadLocalRandom.current().nextInt(tmpDirectories.length);

return new File(tmpDirectories[randomIndex]);
}

/**
* Extracts the local state directories as defined by {@link
* CheckpointingOptions#LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@

import org.junit.Test;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -119,4 +124,32 @@ public void testConvertToString() {
mapElements.put(10, 20);
assertEquals("'''A:,B'':''C:,D''',10:20", ConfigurationUtils.convertToString(mapElements));
}

@Test
public void testRandomTempDirectorySelection() {
final Configuration configuration = new Configuration();
final StringBuilder tempDirectories = new StringBuilder();
final int numberTempDirectories = 20;

for (int i = 0; i < numberTempDirectories; i++) {
tempDirectories.append(UUID.randomUUID()).append(',');
}

configuration.set(CoreOptions.TMP_DIRS, tempDirectories.toString());

final Set<File> allTempDirectories =
Arrays.stream(ConfigurationUtils.parseTempDirectories(configuration))
.map(File::new)
.collect(Collectors.toSet());

final Set<File> drawnTempDirectories = new HashSet<>();
final int numberDraws = 100;

for (int i = 0; i < numberDraws; i++) {
drawnTempDirectories.add(ConfigurationUtils.getRandomTempDirectory(configuration));
}

assertThat(drawnTempDirectories).hasSizeGreaterThan(1);
assertThat(drawnTempDirectories).isSubsetOf(allTempDirectories);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
@GuardedBy("lock")
private ExecutorService ioExecutor;

@GuardedBy("lock")
private WorkingDirectory workingDirectory;

private ExecutionGraphInfoStore executionGraphInfoStore;

private final Thread shutDownHook;
Expand Down Expand Up @@ -354,6 +357,10 @@ protected void initializeServices(Configuration configuration, PluginManager plu
"Initialize cluster entrypoint {} with resource id {}.",
getClass().getSimpleName(),
resourceId);

workingDirectory =
ClusterEntrypointUtils.createJobManagerWorkingDirectory(
configuration, resourceId);
}
}

Expand Down Expand Up @@ -525,7 +532,8 @@ private CompletableFuture<ApplicationStatus> shutDownAsync(

final CompletableFuture<Void> cleanupDirectoriesFuture =
FutureUtils.runAfterwards(
rpcSystemClassLoaderCloseFuture, this::cleanupDirectories);
rpcSystemClassLoaderCloseFuture,
() -> cleanupDirectories(shutdownBehaviour));

cleanupDirectoriesFuture.whenComplete(
(Void ignored2, Throwable serviceThrowable) -> {
Expand Down Expand Up @@ -571,12 +579,37 @@ private CompletableFuture<Void> closeClusterComponent(
/**
* Clean up of temporary directories created by the {@link ClusterEntrypoint}.
*
* @param shutdownBehaviour specifying the shutdown behaviour
* @throws IOException if the temporary directories could not be cleaned up
*/
protected void cleanupDirectories() throws IOException {
protected void cleanupDirectories(ShutdownBehaviour shutdownBehaviour) throws IOException {
IOException ioException = null;

final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);

FileUtils.deleteDirectory(new File(webTmpDir));
try {
FileUtils.deleteDirectory(new File(webTmpDir));
} catch (IOException ioe) {
ioException = ioe;
}

// We only clean up the working directory if we gracefully shut down. If it is a process
// failure, then we want to keep the working directory for potential recoveries.
if (shutdownBehaviour == ShutdownBehaviour.GRACEFUL_SHUTDOWN) {
synchronized (lock) {
if (workingDirectory != null) {
try {
workingDirectory.delete();
} catch (IOException ioe) {
ioException = ExceptionUtils.firstOrSuppressed(ioe, ioException);
}
}
}
}

if (ioException != null) {
throw ioException;
}
}

// --------------------------------------------------
Expand Down Expand Up @@ -665,8 +698,11 @@ public enum ExecutionMode {
DETACHED
}

private enum ShutdownBehaviour {
/** Shutdown behaviour of a {@link ClusterEntrypoint}. */
protected enum ShutdownBehaviour {
// Graceful shutdown means that the process wants to terminate and will clean everything up
GRACEFUL_SHUTDOWN,
// Process failure means that we don't clean up things so that they could be recovered
PROCESS_FAILURE,
}
}
Loading

0 comments on commit 09deea1

Please sign in to comment.