Skip to content

Commit

Permalink
定制fix和source stage节点
Browse files Browse the repository at this point in the history
  • Loading branch information
yuananf committed Jun 25, 2015
1 parent 97f3ff6 commit 32c7b6e
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.base.Splitter;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.HashMultimap;
Expand Down Expand Up @@ -62,6 +64,11 @@ public class NodeScheduler
private final NodeTaskMap nodeTaskMap;
private final boolean doubleScheduling;

private final String reportNodes;
private final List<String> reportNodeList;
private final boolean schedulerFixToReport;
private final boolean schedulerSourceToReport;

@Inject
public NodeScheduler(NodeManager nodeManager, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap)
{
Expand All @@ -75,6 +82,11 @@ public NodeScheduler(NodeManager nodeManager, NodeSchedulerConfig config, NodeTa
this.maxSplitsPerNodePerTaskWhenFull = config.getMaxPendingSplitsPerNodePerTask();
this.nodeTaskMap = checkNotNull(nodeTaskMap, "nodeTaskMap is null");
checkArgument(maxSplitsPerNode > maxSplitsPerNodePerTaskWhenFull, "maxSplitsPerNode must be > maxSplitsPerNodePerTaskWhenFull");

this.reportNodes = config.getReportNodes();
this.schedulerFixToReport = config.getSchedulerFixToReport();
this.schedulerSourceToReport = config.getSchedulerSourceToReport();
this.reportNodeList = Splitter.on(",").splitToList(reportNodes);
}

@Managed
Expand Down Expand Up @@ -165,18 +177,22 @@ public Node selectCurrentNode()
return nodeManager.getCurrentNode();
}

public List<Node> selectRandomNodes(int limit)
public List<Node> selectRandomNodes(int limit, final PlanFragment.PlanDistribution pd)
{
return selectNodes(limit, randomizedNodes());
return selectNodes(limit, randomizedNodes(), pd);
}

private List<Node> selectNodes(int limit, Iterator<Node> candidates)
private List<Node> selectNodes(int limit, Iterator<Node> candidates, final PlanFragment.PlanDistribution pd)
{
checkArgument(limit > 0, "limit must be at least 1");

List<Node> selected = new ArrayList<>(limit);
while (selected.size() < limit && candidates.hasNext()) {
selected.add(candidates.next());
Node node = candidates.next();
if (scheduleReportNodeIfNecessary(node, pd)) {
continue;
}
selected.add(node);
}

if (doubleScheduling && !selected.isEmpty()) {
Expand All @@ -194,6 +210,40 @@ private List<Node> selectNodes(int limit, Iterator<Node> candidates)
return selected;
}

public List<Node> selectReportNode(int limit)
{
if (NodeSchedulerConfig.REPORT_NODE_DEFAULT.equals(reportNodes)) {
return selectRandomNodes(limit, PlanFragment.PlanDistribution.SINGLE);
}
else {
checkArgument(limit > 0, "limit must be at least 1");
ImmutableList<Node> nodes = nodeMap.get().get().getNodesByHostAndPort().values().stream()
.filter(node -> reportNodeList.contains(node.getHostAndPort().getHostText()))
.collect(toImmutableList());
return selectNodes(limit, new ResettableRandomizedIterator<>(nodes), PlanFragment.PlanDistribution.SINGLE);
}
}

/**
* check whether should schedule the task to report node
*/
public boolean scheduleReportNodeIfNecessary(Node node, final PlanFragment.PlanDistribution pd)
{
boolean skipFlag = false;
if (NodeSchedulerConfig.REPORT_NODE_DEFAULT.equals(reportNodes)) {
return skipFlag;
}
else {
if (reportNodeList != null && reportNodeList.contains(node.getHostAndPort().getHostText())) {
if ((!schedulerFixToReport && PlanFragment.PlanDistribution.FIXED == pd)
|| (!schedulerSourceToReport && PlanFragment.PlanDistribution.SOURCE == pd)) {
skipFlag = true;
}
}
return skipFlag;
}
}

/**
* Identifies the nodes for running the specified splits.
*
Expand All @@ -202,7 +252,7 @@ private List<Node> selectNodes(int limit, Iterator<Node> candidates)
* If we cannot find an assignment for a split, it is not included in the map.
*/
public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<RemoteTask> existingTasks,
boolean controlScanConcurrencyEnabled, int scanConcurrencyCount)
final PlanFragment.PlanDistribution pd, boolean controlScanConcurrencyEnabled, int scanConcurrencyCount)
{
Multimap<Node, Split> assignment = HashMultimap.create();
Map<Node, Integer> assignmentCount = new HashMap<>();
Expand Down Expand Up @@ -230,11 +280,11 @@ public Multimap<Node, Split> computeAssignments(Set<Split> splits, Iterable<Remo
for (Split split : splits) {
List<Node> candidateNodes;
if (locationAwareScheduling || !split.isRemotelyAccessible()) {
candidateNodes = selectCandidateNodes(nodeMap.get().get(), split);
candidateNodes = selectCandidateNodes(nodeMap.get().get(), split, pd);
}
else {
randomCandidates.reset();
candidateNodes = selectNodes(minCandidates, randomCandidates);
candidateNodes = selectNodes(minCandidates, randomCandidates, pd);
}
checkCondition(!candidateNodes.isEmpty(), NO_NODES_AVAILABLE, "No nodes available to run query");

Expand Down Expand Up @@ -288,7 +338,7 @@ private ResettableRandomizedIterator<Node> randomizedNodes()
return new ResettableRandomizedIterator<>(nodes);
}

private List<Node> selectCandidateNodes(NodeMap nodeMap, Split split)
private List<Node> selectCandidateNodes(NodeMap nodeMap, Split split, final PlanFragment.PlanDistribution pd)
{
Set<Node> chosen = new LinkedHashSet<>(minCandidates);
String coordinatorIdentifier = nodeManager.getCurrentNode().getNodeIdentifier();
Expand All @@ -297,6 +347,7 @@ private List<Node> selectCandidateNodes(NodeMap nodeMap, Split split)
for (HostAddress hint : split.getAddresses()) {
nodeMap.getNodesByHostAndPort().get(hint).stream()
.filter(node -> includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier()))
.filter(node -> !scheduleReportNodeIfNecessary(node, pd))
.filter(chosen::add)
.forEach(node -> scheduleLocal.incrementAndGet());

Expand All @@ -314,6 +365,7 @@ private List<Node> selectCandidateNodes(NodeMap nodeMap, Split split)
if (!hint.hasPort() || split.isRemotelyAccessible()) {
nodeMap.getNodesByHost().get(address).stream()
.filter(node -> includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier()))
.filter(node -> !scheduleReportNodeIfNecessary(node, pd))
.filter(chosen::add)
.forEach(node -> scheduleLocal.incrementAndGet());
}
Expand All @@ -332,6 +384,9 @@ private List<Node> selectCandidateNodes(NodeMap nodeMap, Split split)
}
for (Node node : nodeMap.getNodesByRack().get(Rack.of(address))) {
if (includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier())) {
if (scheduleReportNodeIfNecessary(node, pd)) {
continue;
}
if (chosen.add(node)) {
scheduleRack.incrementAndGet();
}
Expand All @@ -352,6 +407,9 @@ private List<Node> selectCandidateNodes(NodeMap nodeMap, Split split)
ResettableRandomizedIterator<Node> randomizedIterator = randomizedNodes();
while (randomizedIterator.hasNext()) {
Node node = randomizedIterator.next();
if (scheduleReportNodeIfNecessary(node, pd)) {
continue;
}
if (chosen.add(node)) {
scheduleRandom.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public class NodeSchedulerConfig
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerNodePerTask = 10;

public static final String REPORT_NODE_DEFAULT = "NA";
private String reportNodes = REPORT_NODE_DEFAULT;
private boolean schedulerFixToReport = true;
private boolean schedulerSourceToReport = true;

public boolean isMultipleTasksPerNodeEnabled()
{
return multipleTasksPerNode;
Expand Down Expand Up @@ -100,4 +105,40 @@ public NodeSchedulerConfig setMaxSplitsPerNode(int maxSplitsPerNode)
this.maxSplitsPerNode = maxSplitsPerNode;
return this;
}

public String getReportNodes()
{
return reportNodes;
}

@Config("node-scheduler.report-nodes")
public NodeSchedulerConfig setReportNodes(String reportNodes)
{
this.reportNodes = reportNodes;
return this;
}

public boolean getSchedulerFixToReport()
{
return schedulerFixToReport;
}

@Config("node-scheduler.scheduler-fix-to-report")
public NodeSchedulerConfig setSchedulerFixToReport(boolean schedulerFixToReport)
{
this.schedulerFixToReport = schedulerFixToReport;
return this;
}

public boolean getSchedulerSourceToReport()
{
return schedulerSourceToReport;
}

@Config("node-scheduler.scheduler-source-to-report")
public NodeSchedulerConfig setSchedulerSourceToReport(boolean schedulerSourceToReport)
{
this.schedulerSourceToReport = schedulerSourceToReport;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,13 @@ else if (fragment.getDistribution() == PlanDistribution.COORDINATOR_ONLY) {
private void scheduleFixedNodeCount(int nodeCount)
{
// create tasks on "nodeCount" random nodes
List<Node> nodes = nodeSelector.selectRandomNodes(nodeCount);
List<Node> nodes;
if (nodeCount == 1) {
nodes = nodeSelector.selectReportNode(nodeCount);
}
else {
nodes = nodeSelector.selectRandomNodes(nodeCount, PlanDistribution.FIXED);
}
checkCondition(!nodes.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");
ImmutableList.Builder<TaskId> tasks = ImmutableList.builder();
for (int taskId = 0; taskId < nodes.size(); taskId++) {
Expand Down Expand Up @@ -487,7 +493,8 @@ private void scheduleSourcePartitionedNodes()
stateMachine.recordGetSplitTime(start);

while (!pendingSplits.isEmpty() && !getState().isDone()) {
Multimap<Node, Split> splitAssignment = nodeSelector.computeAssignments(pendingSplits, tasks.values(), controlScanConcurrencyEnabled, scanConcurrencyCount);
Multimap<Node, Split> splitAssignment = nodeSelector.computeAssignments(pendingSplits, tasks.values(),
PlanDistribution.SOURCE, controlScanConcurrencyEnabled, scanConcurrencyCount);
pendingSplits = ImmutableSet.copyOf(Sets.difference(pendingSplits, ImmutableSet.copyOf(splitAssignment.values())));

assignSplits(nextTaskId, splitAssignment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.sql.planner.PlanFragment;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void testLocationAwareSchedulingDisabledScheduleLocal()
Split split = new Split("foo", new TestSplitLocal());
Set<Split> splits = ImmutableSet.of(split);

Map.Entry<Node, Split> assignment = Iterables.getOnlyElement(selector.computeAssignments(splits, taskMap.values(), false, 1).entries());
Map.Entry<Node, Split> assignment = Iterables.getOnlyElement(selector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1).entries());
assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0));
assertEquals(assignment.getValue(), split);
}
Expand All @@ -112,7 +113,7 @@ public void testScheduleLocal()
Split split = new Split("foo", new TestSplitLocal());
Set<Split> splits = ImmutableSet.of(split);

Map.Entry<Node, Split> assignment = Iterables.getOnlyElement(nodeSelector.computeAssignments(splits, taskMap.values(), false, 1).entries());
Map.Entry<Node, Split> assignment = Iterables.getOnlyElement(nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1).entries());
assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0));
assertEquals(assignment.getValue(), split);
}
Expand All @@ -127,13 +128,13 @@ public void testMultipleTasksPerNode()

NodeScheduler nodeScheduler = new NodeScheduler(nodeManager, nodeSchedulerConfig, nodeTaskMap);
NodeScheduler.NodeSelector nodeSelector = nodeScheduler.createNodeSelector("foo");
List<Node> nodes = nodeSelector.selectRandomNodes(10);
List<Node> nodes = nodeSelector.selectRandomNodes(10, PlanFragment.PlanDistribution.FIXED);
assertEquals(nodes.size(), 3);

nodeSchedulerConfig.setMultipleTasksPerNodeEnabled(true);
nodeScheduler = new NodeScheduler(nodeManager, nodeSchedulerConfig, nodeTaskMap);
nodeSelector = nodeScheduler.createNodeSelector("foo");
nodes = nodeSelector.selectRandomNodes(9);
nodes = nodeSelector.selectRandomNodes(9, PlanFragment.PlanDistribution.FIXED);
assertEquals(nodes.size(), 9);
Map<String, Integer> counts = new HashMap<>();
for (Node node : nodes) {
Expand All @@ -151,7 +152,7 @@ public void testScheduleRemote()
{
Set<Split> splits = new HashSet<>();
splits.add(new Split("foo", new TestSplitRemote()));
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1);
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1);
assertEquals(assignments.size(), 1);
}

Expand All @@ -164,7 +165,7 @@ public void testBasicAssignment()
for (int i = 0; i < 3; i++) {
splits.add(new Split("foo", new TestSplitRemote()));
}
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1);
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1);
assertEquals(assignments.entries().size(), 3);
for (Node node : nodeManager.getActiveDatasourceNodes("foo")) {
assertTrue(assignments.keySet().contains(node));
Expand Down Expand Up @@ -194,7 +195,7 @@ public void testMaxSplitsPerNode()
for (int i = 0; i < 5; i++) {
splits.add(new Split("foo", new TestSplitRemote()));
}
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1);
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1);

// no split should be assigned to the newNode, as it already has maxNodeSplits assigned to it
assertFalse(assignments.keySet().contains(newNode));
Expand Down Expand Up @@ -228,7 +229,7 @@ public void testMaxSplitsPerNodePerTask()
for (int i = 0; i < 5; i++) {
splits.add(new Split("foo", new TestSplitRemote()));
}
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1);
Multimap<Node, Split> assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1);

// no split should be assigned to the newNode, as it already has
// maxSplitsPerNode + maxSplitsPerNodePerTask assigned to it
Expand Down

0 comments on commit 32c7b6e

Please sign in to comment.