Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CPU resource configurable for Kubernates job under MoK Mode #16008

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ data:
|`druid.indexer.runner.peonMonitors`| `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. |`[]`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. |`PT30S` (K8s default)|No|
|`druid.indexer.runner.capacity`| `Integer` | Number of concurrent jobs that can be sent to Kubernetes. |`2147483647`|No|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use the intellij props from here so the whole table doesn't get replaced like this? https://github.com/apache/druid/blob/master/dev/druid_intellij_formatting.xml#L77-L80

intellij config instructions: https://github.com/apache/druid/blob/master/dev/intellij-setup.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

|`druid.indexer.runner.cpuCoreInMicro`| `Integer` | Number of CPU micro core for the task. | `1000`|No|

### Metrics added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public class KubernetesTaskRunnerConfig
@NotNull
private List<String> javaOptsArray = ImmutableList.of();

@JsonProperty
@NotNull
private int cpuCoreInMicro = 0;

@JsonProperty
@NotNull
private Map<String, String> labels = ImmutableMap.of();
Expand Down Expand Up @@ -133,6 +137,7 @@ private KubernetesTaskRunnerConfig(
Period k8sjobLaunchTimeout,
List<String> peonMonitors,
List<String> javaOptsArray,
int cpuCoreInMicro,
Map<String, String> labels,
Map<String, String> annotations,
Integer capacity
Expand Down Expand Up @@ -184,6 +189,10 @@ private KubernetesTaskRunnerConfig(
javaOptsArray,
this.javaOptsArray
);
this.cpuCoreInMicro = ObjectUtils.defaultIfNull(
cpuCoreInMicro,
this.cpuCoreInMicro
);
this.labels = ObjectUtils.defaultIfNull(
labels,
this.labels
Expand Down Expand Up @@ -264,6 +273,11 @@ public List<String> getJavaOptsArray()
return javaOptsArray;
}

public int getCpuCoreInMicro()
{
return cpuCoreInMicro;
}

public Map<String, String> getLabels()
{
return labels;
Expand Down Expand Up @@ -299,6 +313,7 @@ public static class Builder
private Period k8sjobLaunchTimeout;
private List<String> peonMonitors;
private List<String> javaOptsArray;
private int cpuCoreInMicro;
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
Expand Down Expand Up @@ -379,6 +394,12 @@ public Builder withPeonMonitors(List<String> peonMonitors)
return this;
}

public Builder withCpuCore(int cpuCore)
{
this.cpuCoreInMicro = cpuCore;
return this;
}

public Builder withJavaOptsArray(List<String> javaOptsArray)
{
this.javaOptsArray = javaOptsArray;
Expand All @@ -397,6 +418,7 @@ public Builder withAnnotations(Map<String, String> annotations)
return this;
}


public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity)
{
this.capacity = capacity;
Expand All @@ -419,6 +441,7 @@ public KubernetesTaskRunnerConfig build()
this.k8sjobLaunchTimeout,
this.peonMonitors,
this.javaOptsArray,
this.cpuCoreInMicro,
this.labels,
this.annotations,
this.capacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class DruidK8sConstants
public static final String TASK_DATASOURCE = "task.datasource";
public static final int PORT = 8100;
public static final int TLS_PORT = 8091;
public static final int DEFAULT_CPU_MILLICORES = 1000;
public static final String DEFAULT_JAVA_HEAP_SIZE = "1G";
public static final String TLS_ENABLED = "tls.enabled";
public static final String TASK_JSON_ENV = "TASK_JSON";
public static final String TASK_DIR_ENV = "TASK_DIR";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,25 @@ public class PeonCommandContext
private final List<String> javaOpts;
private final File taskDir;
private final boolean enableTls;
private final int CpuMicroCore;

public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir)
public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, int CpuMicroCore)
{
this(comamnd, javaOpts, taskDir, false);
this(comamnd, javaOpts, taskDir, CpuMicroCore, false);
}

public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, boolean enableTls)
public PeonCommandContext(
List<String> comamnd,
List<String> javaOpts,
File taskDir,
int CpuMicroCore,
boolean enableTls
)
{
this.comamnd = comamnd;
this.javaOpts = javaOpts;
this.taskDir = taskDir;
this.CpuMicroCore = CpuMicroCore;
this.enableTls = enableTls;
}

Expand All @@ -66,6 +74,11 @@ public File getTaskDir()
return taskDir;
}

public int getCpuMicroCore()
{
return CpuMicroCore;
}

public boolean isEnableTls()
{
return enableTls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public Job fromTask(Task task) throws IOException
generateCommand(task),
javaOpts(task),
taskConfig.getBaseTaskDir(),
taskRunnerConfig.getCpuCoreInMicro(),
node.isEnableTlsPort()
);
PodSpec podSpec = pod.getSpec();
Expand Down Expand Up @@ -216,7 +217,7 @@ static long getContainerMemory(PeonCommandContext context)
{
List<String> javaOpts = context.getJavaOpts();
Optional<Long> optionalXmx = getJavaOptValueBytes("-Xmx", javaOpts);
long heapSize = HumanReadableBytes.parse("1g");
long heapSize = HumanReadableBytes.parse(DruidK8sConstants.DEFAULT_JAVA_HEAP_SIZE);
if (optionalXmx.isPresent()) {
heapSize = optionalXmx.get();
}
Expand Down Expand Up @@ -319,7 +320,8 @@ protected Container setupMainContainer(
mainContainer.setName("main");
ResourceRequirements requirements = getResourceRequirements(
mainContainer.getResources(),
containerSize
containerSize,
context.getCpuMicroCore()
);
mainContainer.setResources(requirements);
return mainContainer;
Expand Down Expand Up @@ -457,10 +459,13 @@ private List<String> generateCommand(Task task)
}

@VisibleForTesting
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize)
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize, int cpuMicroCore)
{
Map<String, Quantity> resourceMap = new HashMap<>();
resourceMap.put("cpu", new Quantity("1000", "m"));
resourceMap.put(
"cpu",
new Quantity(String.valueOf(cpuMicroCore > 0 ? cpuMicroCore : DruidK8sConstants.DEFAULT_CPU_MILLICORES), "m")
);
resourceMap.put("memory", new Quantity(String.valueOf(containerSize)));
ResourceRequirementsBuilder result = new ResourceRequirementsBuilder();
if (requirements != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception
null
);
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new PeonCommandContext(Collections.singletonList(
"sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"
), new ArrayList<>(), new File(taskBasePath));
PeonCommandContext context = new PeonCommandContext(
Collections.singletonList("sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"),
new ArrayList<>(),
new File(taskBasePath),
config.getCpuCoreInMicro()
);

Job job = adapter.createJobFromPodSpec(podSpec, task, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void serializingAndDeserializingATask() throws IOException
Job jobFromSpec = adapter.createJobFromPodSpec(
K8sTestUtils.getDummyPodSpec(),
task,
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"))
new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"), config.getCpuCoreInMicro())
);
client.batch().v1().jobs().inNamespace("test").create(jobFromSpec);
JobList jobList = client.batch().v1().jobs().inNamespace("test").list();
Expand Down Expand Up @@ -391,15 +391,17 @@ void testGettingContainerSize()
PeonCommandContext context = new PeonCommandContext(
new ArrayList<>(),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
0
);
assertEquals(expected, K8sTaskAdapter.getContainerMemory(context));

context = new PeonCommandContext(
new ArrayList<>(),
Collections.singletonList(
"-server -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=/druid/data -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"),
new File("/tmp")
new File("/tmp"),
0
);
expected = (long) ((HumanReadableBytes.parse("512m") + HumanReadableBytes.parse("1g")) * 1.2);
assertEquals(expected, K8sTaskAdapter.getContainerMemory(context));
Expand Down Expand Up @@ -452,7 +454,8 @@ void testAddingMonitors() throws IOException
PeonCommandContext context = new PeonCommandContext(
new ArrayList<>(),
new ArrayList<>(),
new File("/tmp/")
new File("/tmp/"),
0
);
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
Expand Down Expand Up @@ -548,7 +551,8 @@ void testEphemeralStorageIsRespected() throws IOException
new PeonCommandContext(
Collections.singletonList("foo && bar"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedEphemeralOutput.yaml", Job.class);
Expand All @@ -572,14 +576,72 @@ void testEphemeralStorageIsRespected() throws IOException
Assertions.assertEquals(expected, actual);
}

@Test
void testCPUResourceIsEspected() throws IOException
{
TestKubernetesClient testClient = new TestKubernetesClient(client);
Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class);

List<String> javaOpts = new ArrayList<>();
javaOpts.add("-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G");
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
.withJavaOptsArray(javaOpts)
.withCpuCore(2000)
.build();

SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
testClient,
config,
taskConfig,
startupLoggingConfig,
node,
jsonMapper,
taskLogs
);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
new PeonCommandContext(
Collections.singletonList("foo && bar"),
javaOpts,
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedCPUResourceOutput.yaml", Job.class);
// something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results,
// this would never happen in real life, but for the jdk 17 tests this is a problem
// could be related to: https://bugs.openjdk.org/browse/JDK-8081450
actual.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
expected.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getEnv()
.removeIf(x -> x.getName().equals("TASK_JSON"));
Assertions.assertEquals(expected, actual);

}


@Test
void testEphemeralStorage()
{
// no resources set.
Container container = new ContainerBuilder().build();
ResourceRequirements result = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
// requests and limits will only have 2 items, cpu / memory
assertEquals(2, result.getLimits().size());
Expand All @@ -591,7 +653,8 @@ void testEphemeralStorage()
container.setResources(new ResourceRequirementsBuilder().withRequests(requestMap).withLimits(limitMap).build());
ResourceRequirements ephemeralResult = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
// you will have ephemeral storage as well.
assertEquals(3, ephemeralResult.getLimits().size());
Expand All @@ -609,7 +672,8 @@ void testEphemeralStorage()
container.getResources().setAdditionalProperty("additional", "some-value");
ResourceRequirements additionalProperties = K8sTaskAdapter.getResourceRequirements(
container.getResources(),
100
100,
1000
);
assertEquals(1, additionalProperties.getAdditionalProperties().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ public void testMultiContainerSupport() throws IOException
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutput.yaml", Job.class);
Expand Down Expand Up @@ -159,9 +162,12 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException
Job actual = adapter.createJobFromPodSpec(
spec,
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutputOrder.yaml", Job.class);
Expand Down Expand Up @@ -212,9 +218,12 @@ public void testOverridingPeonMonitors() throws IOException
Job actual = adapter.createJobFromPodSpec(
spec,
task,
new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp")
new PeonCommandContext(
Collections.singletonList(
"/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"),
new ArrayList<>(),
new File("/tmp"),
config.getCpuCoreInMicro()
)
);
Job expected = K8sTestUtils.fileToResource("expectedPodSpec.yaml", Job.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void testSingleContainerSupport() throws IOException
new PeonCommandContext(
Collections.singletonList("foo && bar"),
new ArrayList<>(),
new File("/tmp")
new File("/tmp"),
config.getCpuCoreInMicro()
)
);

Expand Down
Loading
Loading