Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CPU resource configurable for Kubernates job under MoK Mode #16008

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,24 @@ data:
```

### Properties
|Property| Possible Values | Description |Default|required|
|--------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|--------|
|`druid.indexer.runner.debugJobs`| `boolean` | Clean up K8s jobs after tasks complete. |False|No|
|`druid.indexer.runner.sidecarSupport`| `boolean` | Deprecated, specify adapter type as runtime property `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer` instead. If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod. |False|No|
|`druid.indexer.runner.primaryContainerName`| `String` | If running with sidecars, the `primaryContainerName` should be that of your druid container like `druid-overlord`. |First container in `podSpec` list|No|
|`druid.indexer.runner.kubexitImage`| `String` | Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate. |karlkfi/kubexit:latest|No|
|`druid.indexer.runner.disableClientProxy`| `boolean` | Use this if you have a global http(s) proxy and you wish to bypass it. |false|No|
|`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No|
|`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No|
|`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No|
|`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No|
|`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No|
|`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No|
|`druid.indexer.runner.annotations`| `JsonObject` | Additional annotations you want to add to peon pod |`{}`|No|
|`druid.indexer.runner.peonMonitors`| `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. |`[]`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. |`PT30S` (K8s default)|No|
|`druid.indexer.runner.capacity`| `Integer` | Number of concurrent jobs that can be sent to Kubernetes. |`2147483647`|No|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use the intellij props from here so the whole table doesn't get replaced like this? https://github.com/apache/druid/blob/master/dev/druid_intellij_formatting.xml#L77-L80

intellij config instructions: https://github.com/apache/druid/blob/master/dev/intellij-setup.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

|Property| Possible Values | Description | Default |required|
|--------|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------|--------|
|`druid.indexer.runner.debugJobs`| `boolean` | Clean up K8s jobs after tasks complete. | False |No|
|`druid.indexer.runner.sidecarSupport`| `boolean` | Deprecated, specify adapter type as runtime property `druid.indexer.runner.k8s.adapter.type: overlordMultiContainer` instead. If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod. | False |No|
|`druid.indexer.runner.primaryContainerName`| `String` | If running with sidecars, the `primaryContainerName` should be that of your druid container like `druid-overlord`. | First container in `podSpec` list |No|
|`druid.indexer.runner.kubexitImage`| `String` | Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate. | karlkfi/kubexit:latest |No|
|`druid.indexer.runner.disableClientProxy`| `boolean` | Use this if you have a global http(s) proxy and you wish to bypass it. | false |No|
|`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed | `PT4H` |No|
|`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s | `P2D` |No|
|`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped | `PT10M` |No|
|`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. | `PT1H` |No|
|`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. | `-Xmx1g` |No|
|`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod | `{}` |No|
|`druid.indexer.runner.annotations`| `JsonObject` | Additional annotations you want to add to peon pod | `{}` |No|
|`druid.indexer.runner.peonMonitors`| `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. | `[]` |No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. | `PT30S` (K8s default) |No|
|`druid.indexer.runner.capacity`| `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` |No|
|`druid.indexer.runner.cpuCoreInMicro`| `Integer` | Number of CPU micro core for the task. | `1000` |No|

### Metrics added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public class KubernetesTaskRunnerConfig
@NotNull
private List<String> javaOptsArray = ImmutableList.of();

@JsonProperty
@NotNull
private int cpuCoreInMicro = 0;

@JsonProperty
@NotNull
private Map<String, String> labels = ImmutableMap.of();
Expand Down Expand Up @@ -133,6 +137,7 @@ private KubernetesTaskRunnerConfig(
Period k8sjobLaunchTimeout,
List<String> peonMonitors,
List<String> javaOptsArray,
int cpuCoreInMicro,
Map<String, String> labels,
Map<String, String> annotations,
Integer capacity
Expand Down Expand Up @@ -184,6 +189,10 @@ private KubernetesTaskRunnerConfig(
javaOptsArray,
this.javaOptsArray
);
this.cpuCoreInMicro = ObjectUtils.defaultIfNull(
cpuCoreInMicro,
this.cpuCoreInMicro
);
this.labels = ObjectUtils.defaultIfNull(
labels,
this.labels
Expand Down Expand Up @@ -264,6 +273,11 @@ public List<String> getJavaOptsArray()
return javaOptsArray;
}

public int getCpuCoreInMicro()
{
return cpuCoreInMicro;
}

public Map<String, String> getLabels()
{
return labels;
Expand Down Expand Up @@ -299,6 +313,7 @@ public static class Builder
private Period k8sjobLaunchTimeout;
private List<String> peonMonitors;
private List<String> javaOptsArray;
private int cpuCoreInMicro;
private Map<String, String> labels;
private Map<String, String> annotations;
private Integer capacity;
Expand Down Expand Up @@ -379,6 +394,12 @@ public Builder withPeonMonitors(List<String> peonMonitors)
return this;
}

public Builder withCpuCore(int cpuCore)
{
this.cpuCoreInMicro = cpuCore;
return this;
}

public Builder withJavaOptsArray(List<String> javaOptsArray)
{
this.javaOptsArray = javaOptsArray;
Expand All @@ -397,6 +418,7 @@ public Builder withAnnotations(Map<String, String> annotations)
return this;
}


public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity)
{
this.capacity = capacity;
Expand All @@ -419,6 +441,7 @@ public KubernetesTaskRunnerConfig build()
this.k8sjobLaunchTimeout,
this.peonMonitors,
this.javaOptsArray,
this.cpuCoreInMicro,
this.labels,
this.annotations,
this.capacity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class DruidK8sConstants
public static final String TASK_DATASOURCE = "task.datasource";
public static final int PORT = 8100;
public static final int TLS_PORT = 8091;
public static final int DEFAULT_CPU_MILLICORES = 1000;
public static final String DEFAULT_JAVA_HEAP_SIZE = "1G";
public static final String TLS_ENABLED = "tls.enabled";
public static final String TASK_JSON_ENV = "TASK_JSON";
public static final String TASK_DIR_ENV = "TASK_DIR";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,25 @@ public class PeonCommandContext
private final List<String> javaOpts;
private final File taskDir;
private final boolean enableTls;
private final int CpuMicroCore;

public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir)
public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, int CpuMicroCore)
{
this(comamnd, javaOpts, taskDir, false);
this(comamnd, javaOpts, taskDir, CpuMicroCore, false);
}

public PeonCommandContext(List<String> comamnd, List<String> javaOpts, File taskDir, boolean enableTls)
public PeonCommandContext(
List<String> comamnd,
List<String> javaOpts,
File taskDir,
int CpuMicroCore,
boolean enableTls
)
{
this.comamnd = comamnd;
this.javaOpts = javaOpts;
this.taskDir = taskDir;
this.CpuMicroCore = CpuMicroCore;
this.enableTls = enableTls;
}

Expand All @@ -66,6 +74,11 @@ public File getTaskDir()
return taskDir;
}

public int getCpuMicroCore()
{
return CpuMicroCore;
}

public boolean isEnableTls()
{
return enableTls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public Job fromTask(Task task) throws IOException
generateCommand(task),
javaOpts(task),
taskConfig.getBaseTaskDir(),
taskRunnerConfig.getCpuCoreInMicro(),
node.isEnableTlsPort()
);
PodSpec podSpec = pod.getSpec();
Expand Down Expand Up @@ -216,7 +217,7 @@ static long getContainerMemory(PeonCommandContext context)
{
List<String> javaOpts = context.getJavaOpts();
Optional<Long> optionalXmx = getJavaOptValueBytes("-Xmx", javaOpts);
long heapSize = HumanReadableBytes.parse("1g");
long heapSize = HumanReadableBytes.parse(DruidK8sConstants.DEFAULT_JAVA_HEAP_SIZE);
if (optionalXmx.isPresent()) {
heapSize = optionalXmx.get();
}
Expand Down Expand Up @@ -319,7 +320,8 @@ protected Container setupMainContainer(
mainContainer.setName("main");
ResourceRequirements requirements = getResourceRequirements(
mainContainer.getResources(),
containerSize
containerSize,
context.getCpuMicroCore()
);
mainContainer.setResources(requirements);
return mainContainer;
Expand Down Expand Up @@ -457,10 +459,13 @@ private List<String> generateCommand(Task task)
}

@VisibleForTesting
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize)
static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize, int cpuMicroCore)
{
Map<String, Quantity> resourceMap = new HashMap<>();
resourceMap.put("cpu", new Quantity("1000", "m"));
resourceMap.put(
"cpu",
new Quantity(String.valueOf(cpuMicroCore > 0 ? cpuMicroCore : DruidK8sConstants.DEFAULT_CPU_MILLICORES), "m")
);
resourceMap.put("memory", new Quantity(String.valueOf(containerSize)));
ResourceRequirementsBuilder result = new ResourceRequirementsBuilder();
if (requirements != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception
null
);
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new PeonCommandContext(Collections.singletonList(
"sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"
), new ArrayList<>(), new File(taskBasePath));
PeonCommandContext context = new PeonCommandContext(
Collections.singletonList("sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"),
new ArrayList<>(),
new File(taskBasePath),
config.getCpuCoreInMicro()
);

Job job = adapter.createJobFromPodSpec(podSpec, task, context);

Expand Down
Loading
Loading