-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support assign tasks to run on different categories of MiddleManagers #7066
Conversation
I suppose AutoScaler can provision only one type of workers still? |
{ | ||
private final String defaultTier; | ||
// key: datasource, value: tier | ||
private final Map<String, String> tiers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiersByDatasources
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about datasourceToTier
or datasource2Tier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@QiuMM I think there's no strong argument for choosing between valueByKey
or keyToValue
template, I would take the one which is more common throughout the project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of supporting only a fixed usecase like tiers per datasource or tiers per indexing type, why not just support a "tier" property within the "context" section of an ingestion spec?
This would allow for arbitrary tier mappings including the ones you have in mind.
The context section is already used for those purposes in the context of queries to set priorities, timeouts etc. I think it would therefore also be a valid design choice to support standard context properties for ingestion tasks.
`
{
"type": "index_hadoop",
"context": {
"tier": "my-very-personal-tier-token",
...
}
...
}
Good point about the autoscaler. its interface should then probably also receive a tier name for which a scale operation should happen...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sascha-coenen thanks for your suggestion. I had also considered this, but it would involve much more code modification and we may need to change the current design of WorkerSelectStrategy
as well as AutoScaler
, which is not worth to pay so much effort. My solution is undoubtedly logical within current architecture. And it's enough to use I think.
...g/apache/druid/indexing/overlord/setup/FillCapacityWithTierSpecWorkerSelectStrategyTest.java
Outdated
Show resolved
Hide resolved
), | ||
true | ||
); | ||
final FillCapacityWithTierSpecWorkerSelectStrategy strategy = new FillCapacityWithTierSpecWorkerSelectStrategy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this block is the same everywhere
final FillCapacityWithTierSpecWorkerSelectStrategy strategy = new FillCapacityWithTierSpecWorkerSelectStrategy(
workerTierSpec);
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
new NoopTask(null, "ds1", 1, 0, null, null, null)
);
Extracting a method will make the test class look more consice.
Thanks for your reminder. I have not considered this since I have never used AutoScaler. It need to support tier for AutoScaler since different tier might have different resource configuration, I have no idea about current auto scale strategy would use which tier's configuration to create new workers. |
Hello @QiuMM just checking in to see if you've plans to continue working on this PR. We have a few usecases internally where this feature would fit in nicely. |
@a2l007 I'm going to complete this PR in next week. |
@QiuMM thank you for working on this issue. I'm untagging milestone since this issue is not necessarily a release blocker. Feel free to let me know if you think this should be. |
@egor-ryashin sorry for the delay, I have made some changes. |
@QiuMM did you consider naming different from "tier"? I'm concerned with the collision in names with historical tiers, it may be confusing. Maybe call it "class", or "kind", or "category"? |
@leventov sounds reasonable. I'm not sure which one is better, maybe "category"? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this patch because you're running into this on your cluster and this patch is already running on your cluster ? (sorry it was not clear from the PR description and I am generally more in favor of features that come out of real world production needs :) )
FWIW: use of name "tier" wasn't confusing to me when I read the PR title due to the context of MM and I wouldn't be against keeping it. but of course YMMV.
.stream() | ||
.filter(worker -> worker.canRunTask(task) | ||
&& worker.isValidVersion(workerTaskRunnerConfig.getMinWorkerVersion())) | ||
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would be nice to extract this into a separate method from both versions of selectWorker(..)
as this is a mouthful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
.collect(Collectors.toMap(w -> w.getWorker().getHost(), Function.identity())); | ||
|
||
// select worker according to worker tier spec | ||
if (workerTierSpec != null && workerTierSpec.getTierMap() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe default the tierMap in WorkerTierSpec to be Collections.EMPTY_MAP so that second null check here is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
final WorkerTierSpec.TierConfig tierConfig = workerTierSpec.getTierMap() | ||
.getOrDefault( | ||
task.getType(), | ||
new WorkerTierSpec.TierConfig(null, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why create a TierConfig with nulls when we could simply next check if(tierConfig == null) and do the default action which would happen anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
final Map<String, String> tierAffinity = tierConfig.getTierAffinity(); | ||
|
||
// select worker from preferred tier | ||
final String preferredTier = tierAffinity != null ? tierAffinity.get(task.getDataSource()) : defaultTier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe default the tierAffinity in TierConfig to be Collections.EMPTY_MAP so that null check here is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@JsonProperty("strong") boolean strong | ||
) | ||
{ | ||
this.tierMap = tierMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to remove null checks in other places...
this.tierMap = tierMap; | |
this.tierMap = tierMap == null ? Collections.EMPTY_MAP : tierMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
) | ||
{ | ||
this.defaultTier = defaultTier; | ||
this.tierAffinity = tierAffinity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to remove null checks in other places...
this.tierAffinity = tierAffinity; | |
this. tierAffinity = tierAffinity == null ? Collections.EMPTY_MAP : tierAffinity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
) | ||
{ | ||
this.ip = ip; | ||
this.capacity = capacity; | ||
this.version = version; | ||
this.tier = tier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many Druid users follow rolling upgrade (as described in http://druid.io/docs/latest/operations/rolling-updates.html ) . If you only upgrade overlord then, at overlord, tier is going to be null which would lead to a NPE in WorkerSelectUtils.getTierWorkers(..)
at line "Maps.filterValues(workerMap, workerInfo -> workerInfo.getWorker().getTier().equals(tier))" and maybe in some other places as well.
that should probably be fixed or something needs to go in release nodes along the lines of making sure that new tier based strategies are only to be used when there has been at least one restart of overlord after all middleManager upgrade (due to second cluster upgrade or forced restart of overlord after MM upgrade or whatever)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users want to use this feature, they must upgrade overlord and all middleManager ( follow the rolling upgrade is ok). I have noted this in the document. An extra restart of overlord is not necessary.
@himanshug yes, this patch is already running on my cluster. |
@QiuMM "category" sounds good. |
@QiuMM great, thanks. |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions. |
Hi @QiuMM, any chance you have some time to spare to resolve conflicts and address outstanding review? This seems like a rather useful feature for Druid and not so far from being finished. I would hate to see it get killed by the stale bot. |
This issue is no longer marked as stale. |
@QiuMM It would be great if we could push this in 0.16.0 as we're planning to use this feature once it is available. Would it be possible for you to finish this up at your earliest convenience? |
Hey @QiuMM, it looks like of all the open PRs, this one has the most positive reactions up top ❤️ Any chance you are interested in wrapping it up? |
Oops, I have forgotten this, sorry. I'll update this week @sascha-coenen @gianm |
I'm buried in work this week. I'll update in next week ): |
That make sense to me. I think realtime tasks can require a cluster with not the same configuration as for batch ones. So probably overlord could accept an array of worker config specs via dynamic configuration API call. |
@VladimirIordanov Great! I have never used AutoScaler, I think we can finish this PR first, then you can open a new PR to add features you want. |
The CI build failed:
I have checked the doc and can't find any error, can anyone help me? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm 👍
docs/configuration/index.md
Outdated
|Property|Description|Default| | ||
|--------|-----------|-------| | ||
|`type`|`equalDistributionWithCategorySpec`.|required; must be `equalDistributionWithCategorySpec`| | ||
|`workerCategorySpec`|[Worker Category Spec](#WorkerCategorySpec) object|null (no worker category spec)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like these links should be all lower case, e.g. #workercategoryspec
, I think that should fix CI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM when build is successful. Thanks @QiuMM
Aw, now it is triggering false spelling errors for config names: https://travis-ci.org/apache/incubator-druid/jobs/596417943#L274 can add entries here https://github.com/apache/incubator-druid/blob/master/website/.spelling to make it ignore it, I'm +1 after CI too though |
@clintropolis It seems didn't work. |
Oops, I guess you added it to the wrong part of the file so the exclusions are applying to
The first part of the file is exclusions that apply to everything and then for some reason after that we have file specific exclusions, but not really sure why we have it split up like that instead of just a big global list. |
The CI have passed, thank you very much @clintropolis . |
Very excited for this! We are working on making it easier to do rolling upgrades of our middlemanagers. As part of that we're limiting the task durations of our |
Just FYI to whoever showed interest in this feature: The following proposal is adding support for this: An implementation is available too here: #8989 If you are interested in seeing this feature in Druid as well, feel free to take a look and upvote the proposal. |
81344f9 add metrics for active processing thread count and thread pending task. 135fcc9 Add metrics for historical intermediate result buffer available count & max processing thread count 3f2604d Add support to attempt converting to top n query when two group by dimensions present 0a6e5d6 "Pulling in 'Support assign tasks to run on different categories of MiddleManagers apache#7066' apache#7066"
…es of MiddleManagers apache#7066' Summary: Pulling in 'Support assign tasks to run on different categories of MiddleManagers apache#7066' apache#7066 Reviewers: O1139 Druid, jwang Reviewed By: O1139 Druid, jwang Subscribers: jenkins, mleonard, #realtime-analytics Differential Revision: https://phabricator.pinadmin.com/D651472
Motivation
Tasks with different task type might need different resource to run, i.e realtime task need much CPU and memory than Hadoop batch task and kill task. If all the MiddleManager workers share the same resource configuration will cause much waste.
Support cross data center, different datasource‘s task might want to run on different data center.
Current worker select strategy use
affinityConfig
to specify different datasource running on different workers. However, using this way we need to maintain the workers' ip and port information, this is bad since the ip and port might change, especially if we run MiddleManagers on cloud. Besides, we can't specify tasks to run on different workers based on tasks' task type when usingaffinityConfig
.Solution
Separating MiddleManager nodes into categories, then we can run tasks in different categories based on tasks' task type and datasource name. First, add a category property for the MiddleManager worker. Then, implement
WorkerSelectStrategy
to support assign tasks to run on different categories. In this PR, providing two select strategy which namingEqualDistributionWithCategorySpecWorkerSelectStrategy
andFillCapacityWithCategorySpecWorkerSelectStrategy
. You can review the docs in this PR to know the details of usage.