Skip to content

Commit

Permalink
[FLINK-21480][yarn][k8s] Request external resource according to TaskE…
Browse files Browse the repository at this point in the history
…xecutorProcessSpec

This closes apache#15112
  • Loading branch information
KarmaGYZ authored and xintongsong committed Mar 17, 2021
1 parent 6d95b78 commit b0467af
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(
dynamicProperties,
jvmMemOpts,
taskManagerParameters,
ExternalResourceUtils.getExternalResources(
ExternalResourceUtils.getExternalResourceConfigurationKeys(
flinkConfig,
KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private Container decorateMainContainer(Container container) {
requirementsInPodTemplate,
kubernetesJobManagerParameters.getJobManagerMemoryMB(),
kubernetesJobManagerParameters.getJobManagerCPU(),
Collections.emptyMap(),
Collections.emptyMap());
mainContainerBuilder
.withName(Constants.MAIN_CONTAINER_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private Container decorateMainContainer(Container container) {
requirementsInPodTemplate,
kubernetesTaskManagerParameters.getTaskManagerMemoryMB(),
kubernetesTaskManagerParameters.getTaskManagerCPU(),
kubernetesTaskManagerParameters.getTaskManagerExternalResources());
kubernetesTaskManagerParameters.getTaskManagerExternalResources(),
kubernetesTaskManagerParameters.getTaskManagerExternalResourceConfigKeys());
final String image =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.kubernetes.kubeclient.parameters;

import org.apache.flink.api.common.resources.ExternalResource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
Expand Down Expand Up @@ -46,21 +47,22 @@ public class KubernetesTaskManagerParameters extends AbstractKubernetesParameter

private final ContaineredTaskManagerParameters containeredTaskManagerParameters;

private final Map<String, Long> taskManagerExternalResources;
private final Map<String, String> taskManagerExternalResourceConfigKeys;

public KubernetesTaskManagerParameters(
Configuration flinkConfig,
String podName,
String dynamicProperties,
String jvmMemOptsEnv,
ContaineredTaskManagerParameters containeredTaskManagerParameters,
Map<String, Long> taskManagerExternalResources) {
Map<String, String> taskManagerExternalResourceConfigKeys) {
super(flinkConfig);
this.podName = checkNotNull(podName);
this.dynamicProperties = checkNotNull(dynamicProperties);
this.jvmMemOptsEnv = checkNotNull(jvmMemOptsEnv);
this.containeredTaskManagerParameters = checkNotNull(containeredTaskManagerParameters);
this.taskManagerExternalResources = checkNotNull(taskManagerExternalResources);
this.taskManagerExternalResourceConfigKeys =
checkNotNull(taskManagerExternalResourceConfigKeys);
}

@Override
Expand Down Expand Up @@ -120,12 +122,16 @@ public double getTaskManagerCPU() {
.doubleValue();
}

public Map<String, ExternalResource> getTaskManagerExternalResources() {
return containeredTaskManagerParameters.getTaskExecutorProcessSpec().getExtendedResources();
}

public String getServiceAccount() {
return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT);
}

public Map<String, Long> getTaskManagerExternalResources() {
return taskManagerExternalResources;
public Map<String, String> getTaskManagerExternalResourceConfigKeys() {
return Collections.unmodifiableMap(taskManagerExternalResourceConfigKeys);
}

public int getRPCPort() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.kubernetes.utils;

import org.apache.flink.api.common.resources.ExternalResource;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
Expand All @@ -41,6 +42,7 @@
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionUtils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
Expand Down Expand Up @@ -307,13 +309,15 @@ public static CompletedCheckpointStore createCompletedCheckpointStore(
* @param mem Memory in mb.
* @param cpu cpu.
* @param externalResources external resources
* @param externalResourceConfigKeys config keys of external resources
* @return KubernetesResource requirements.
*/
public static ResourceRequirements getResourceRequirements(
ResourceRequirements resourceRequirements,
int mem,
double cpu,
Map<String, Long> externalResources) {
Map<String, ExternalResource> externalResources,
Map<String, String> externalResourceConfigKeys) {
final Quantity cpuQuantity = new Quantity(String.valueOf(cpu));
final Quantity memQuantity = new Quantity(mem + Constants.RESOURCE_UNIT_MB);

Expand All @@ -325,16 +329,20 @@ public static ResourceRequirements getResourceRequirements(
.addToLimits(Constants.RESOURCE_NAME_CPU, cpuQuantity);

// Add the external resources to resource requirement.
for (Map.Entry<String, Long> externalResource : externalResources.entrySet()) {
final Quantity resourceQuantity =
new Quantity(String.valueOf(externalResource.getValue()));
resourceRequirementsBuilder
.addToRequests(externalResource.getKey(), resourceQuantity)
.addToLimits(externalResource.getKey(), resourceQuantity);
LOG.info(
"Request external resource {} with config key {}.",
resourceQuantity.getAmount(),
externalResource.getKey());
for (Map.Entry<String, ExternalResource> externalResource : externalResources.entrySet()) {
final String configKey = externalResourceConfigKeys.get(externalResource.getKey());
if (!StringUtils.isNullOrWhitespaceOnly(configKey)) {
final Quantity resourceQuantity =
new Quantity(
String.valueOf(externalResource.getValue().getValue().longValue()));
resourceRequirementsBuilder
.addToRequests(configKey, resourceQuantity)
.addToLimits(configKey, resourceQuantity);
LOG.info(
"Request external resource {} with config key {}.",
resourceQuantity.getAmount(),
configKey);
}
}

return resourceRequirementsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected void onSetup() throws Exception {
DYNAMIC_PROPERTIES,
JVM_MEM_OPTS_ENV,
containeredTaskManagerParameters,
ExternalResourceUtils.getExternalResources(
ExternalResourceUtils.getExternalResourceConfigurationKeys(
flinkConfig,
KubernetesConfigOptions
.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,68 +64,52 @@ private static Set<String> getExternalResourceSet(Configuration config) {
}

/**
* Get the external resources map. The key should be used for deployment specific container
* request, and values should be the amount of that resource.
* Get the external resource configuration keys map, indexed by the resource name. The
* configuration key should be used for deployment specific container request.
*
* @param config Configurations
* @param suffix suffix of config option for deployment specific configuration key
* @return external resources map, map the amount to the configuration key for deployment
* specific container request
* @return external resource configuration keys map, map the resource name to the configuration
* key for deployment * specific container request
*/
public static Map<String, Long> getExternalResources(Configuration config, String suffix) {
public static Map<String, String> getExternalResourceConfigurationKeys(
Configuration config, String suffix) {
final Set<String> resourceSet = getExternalResourceSet(config);
final Map<String, String> configKeysToResourceNameMap = new HashMap<>();
LOG.info("Enabled external resources: {}", resourceSet);

if (resourceSet.isEmpty()) {
return Collections.emptyMap();
}

final Map<String, Long> externalResourceConfigs = new HashMap<>();
final Map<String, String> externalResourceConfigs = new HashMap<>();
for (String resourceName : resourceSet) {
final ConfigOption<Long> amountOption =
key(ExternalResourceOptions.getAmountConfigOptionForResource(resourceName))
.longType()
.noDefaultValue();
final ConfigOption<String> configKeyOption =
key(ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
resourceName, suffix))
.stringType()
.noDefaultValue();
final String configKey = config.getString(configKeyOption);
final Optional<Long> amountOpt = config.getOptional(amountOption);
final String configKey = config.get(configKeyOption);

if (StringUtils.isNullOrWhitespaceOnly(configKey)) {
LOG.warn(
"Could not find valid {} for {}. Will ignore that resource.",
configKeyOption.key(),
resourceName);
continue;
}
if (!amountOpt.isPresent()) {
LOG.warn(
"The amount of the {} should be configured. Will ignore that resource.",
resourceName);
continue;
} else if (amountOpt.get() <= 0) {
LOG.warn(
"The amount of the {} should be positive while finding {}. Will ignore that resource.",
amountOpt.get(),
resourceName);
continue;
}

if (externalResourceConfigs.put(configKey, amountOpt.get()) != null) {
LOG.warn(
"Duplicate config key {} occurred for external resources, the one named {} with amount {} will overwrite the value.",
configKey,
resourceName,
amountOpt);
} else {
LOG.info(
"Add external resource config for {} with key {} value {}.",
resourceName,
configKeysToResourceNameMap.compute(
configKey,
amountOpt);
(ignored, previousResource) -> {
if (previousResource != null) {
LOG.warn(
"Duplicate config key {} occurred for external resources, the one named {} will overwrite the value.",
configKey,
resourceName);
externalResourceConfigs.remove(previousResource);
}
return resourceName;
});
externalResourceConfigs.put(resourceName, configKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,75 +60,27 @@ public class ExternalResourceUtilsTest extends TestLogger {
private static final String SUFFIX = "flink.config-key";

@Test
public void testGetExternalResourcesWithConfigKeyNotSpecifiedOrEmpty() {
public void testGetExternalResourceConfigurationKeysWithConfigKeyNotSpecifiedOrEmpty() {
final Configuration config = new Configuration();
final String resourceConfigKey = "";

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1),
RESOURCE_AMOUNT_1);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_2),
RESOURCE_AMOUNT_2);
config.setString(
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
RESOURCE_NAME_1, SUFFIX),
resourceConfigKey);

final Map<String, Long> configMap =
ExternalResourceUtils.getExternalResources(config, SUFFIX);
final Map<String, String> configMap =
ExternalResourceUtils.getExternalResourceConfigurationKeys(config, SUFFIX);

assertThat(configMap.entrySet(), is(empty()));
}

@Test
public void testGetExternalResourcesWithIllegalAmount() {
public void testGetExternalResourceConfigurationKeysWithConflictConfigKey() {
final Configuration config = new Configuration();
final long resourceAmount = 0L;

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1),
resourceAmount);
config.setString(
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
RESOURCE_NAME_1, SUFFIX),
RESOURCE_CONFIG_KEY_1);

final Map<String, Long> configMap =
ExternalResourceUtils.getExternalResources(config, SUFFIX);

assertThat(configMap.entrySet(), is(empty()));
}

@Test
public void testGetExternalResourcesWithoutConfigAmount() {
final Configuration config = new Configuration();

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setString(
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
RESOURCE_NAME_1, SUFFIX),
RESOURCE_CONFIG_KEY_1);

final Map<String, Long> configMap =
ExternalResourceUtils.getExternalResources(config, SUFFIX);

assertThat(configMap.entrySet(), is(empty()));
}

@Test
public void testGetExternalResourcesWithConflictConfigKey() {
final Configuration config = new Configuration();

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1),
RESOURCE_AMOUNT_1);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_2),
RESOURCE_AMOUNT_2);
config.setString(
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
RESOURCE_NAME_1, SUFFIX),
Expand All @@ -138,42 +90,12 @@ public void testGetExternalResourcesWithConflictConfigKey() {
RESOURCE_NAME_2, SUFFIX),
RESOURCE_CONFIG_KEY_1);

final Map<String, Long> configMap =
ExternalResourceUtils.getExternalResources(config, SUFFIX);
final Map<String, String> configMap =
ExternalResourceUtils.getExternalResourceConfigurationKeys(config, SUFFIX);

// Only one of the config key would be kept.
// Only one of the resource name would be kept.
assertThat(configMap.size(), is(1));
assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1));
}

@Test
public void testGetExternalResourcesWithMultipleExternalResource() {
final Configuration config = new Configuration();

config.set(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, RESOURCE_LIST);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1),
RESOURCE_AMOUNT_1);
config.setLong(
ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_2),
RESOURCE_AMOUNT_2);
config.setString(
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
RESOURCE_NAME_1, SUFFIX),
RESOURCE_CONFIG_KEY_1);
config.setString(
ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource(
RESOURCE_NAME_2, SUFFIX),
RESOURCE_CONFIG_KEY_2);

final Map<String, Long> configMap =
ExternalResourceUtils.getExternalResources(config, SUFFIX);

assertThat(configMap.size(), is(2));
assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_1));
assertTrue(configMap.containsKey(RESOURCE_CONFIG_KEY_2));
assertThat(configMap.get(RESOURCE_CONFIG_KEY_1), is(RESOURCE_AMOUNT_1));
assertThat(configMap.get(RESOURCE_CONFIG_KEY_2), is(RESOURCE_AMOUNT_2));
assertThat(configMap.values(), contains(RESOURCE_CONFIG_KEY_1));
}

@Test
Expand Down
Loading

0 comments on commit b0467af

Please sign in to comment.