From 32c7b6e86b81816acd9910791e927a181d3f2972 Mon Sep 17 00:00:00 2001 From: yuananf Date: Thu, 25 Jun 2015 16:40:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E5=88=B6fix=E5=92=8Csource=20stage?= =?UTF-8?q?=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../presto/execution/NodeScheduler.java | 74 +++++++++++++++++-- .../presto/execution/NodeSchedulerConfig.java | 41 ++++++++++ .../presto/execution/SqlStageExecution.java | 11 ++- .../presto/execution/TestNodeScheduler.java | 17 +++-- 4 files changed, 125 insertions(+), 18 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/NodeScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/NodeScheduler.java index ec26fa3033d7..f2bed5fb336a 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/NodeScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/NodeScheduler.java @@ -17,6 +17,8 @@ import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.Node; import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.sql.planner.PlanFragment; +import com.google.common.base.Splitter; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; @@ -62,6 +64,11 @@ public class NodeScheduler private final NodeTaskMap nodeTaskMap; private final boolean doubleScheduling; + private final String reportNodes; + private final List reportNodeList; + private final boolean schedulerFixToReport; + private final boolean schedulerSourceToReport; + @Inject public NodeScheduler(NodeManager nodeManager, NodeSchedulerConfig config, NodeTaskMap nodeTaskMap) { @@ -75,6 +82,11 @@ public NodeScheduler(NodeManager nodeManager, NodeSchedulerConfig config, NodeTa this.maxSplitsPerNodePerTaskWhenFull = config.getMaxPendingSplitsPerNodePerTask(); this.nodeTaskMap = checkNotNull(nodeTaskMap, "nodeTaskMap is null"); checkArgument(maxSplitsPerNode > maxSplitsPerNodePerTaskWhenFull, "maxSplitsPerNode must be > maxSplitsPerNodePerTaskWhenFull"); + + this.reportNodes = config.getReportNodes(); + this.schedulerFixToReport = config.getSchedulerFixToReport(); + this.schedulerSourceToReport = config.getSchedulerSourceToReport(); + this.reportNodeList = Splitter.on(",").splitToList(reportNodes); } @Managed @@ -165,18 +177,22 @@ public Node selectCurrentNode() return nodeManager.getCurrentNode(); } - public List selectRandomNodes(int limit) + public List selectRandomNodes(int limit, final PlanFragment.PlanDistribution pd) { - return selectNodes(limit, randomizedNodes()); + return selectNodes(limit, randomizedNodes(), pd); } - private List selectNodes(int limit, Iterator candidates) + private List selectNodes(int limit, Iterator candidates, final PlanFragment.PlanDistribution pd) { checkArgument(limit > 0, "limit must be at least 1"); List selected = new ArrayList<>(limit); while (selected.size() < limit && candidates.hasNext()) { - selected.add(candidates.next()); + Node node = candidates.next(); + if (scheduleReportNodeIfNecessary(node, pd)) { + continue; + } + selected.add(node); } if (doubleScheduling && !selected.isEmpty()) { @@ -194,6 +210,40 @@ private List selectNodes(int limit, Iterator candidates) return selected; } + public List selectReportNode(int limit) + { + if (NodeSchedulerConfig.REPORT_NODE_DEFAULT.equals(reportNodes)) { + return selectRandomNodes(limit, PlanFragment.PlanDistribution.SINGLE); + } + else { + checkArgument(limit > 0, "limit must be at least 1"); + ImmutableList nodes = nodeMap.get().get().getNodesByHostAndPort().values().stream() + .filter(node -> reportNodeList.contains(node.getHostAndPort().getHostText())) + .collect(toImmutableList()); + return selectNodes(limit, new ResettableRandomizedIterator<>(nodes), PlanFragment.PlanDistribution.SINGLE); + } + } + + /** + * check whether should schedule the task to report node + */ + public boolean scheduleReportNodeIfNecessary(Node node, final PlanFragment.PlanDistribution pd) + { + boolean skipFlag = false; + if (NodeSchedulerConfig.REPORT_NODE_DEFAULT.equals(reportNodes)) { + return skipFlag; + } + else { + if (reportNodeList != null && reportNodeList.contains(node.getHostAndPort().getHostText())) { + if ((!schedulerFixToReport && PlanFragment.PlanDistribution.FIXED == pd) + || (!schedulerSourceToReport && PlanFragment.PlanDistribution.SOURCE == pd)) { + skipFlag = true; + } + } + return skipFlag; + } + } + /** * Identifies the nodes for running the specified splits. * @@ -202,7 +252,7 @@ private List selectNodes(int limit, Iterator candidates) * If we cannot find an assignment for a split, it is not included in the map. */ public Multimap computeAssignments(Set splits, Iterable existingTasks, - boolean controlScanConcurrencyEnabled, int scanConcurrencyCount) + final PlanFragment.PlanDistribution pd, boolean controlScanConcurrencyEnabled, int scanConcurrencyCount) { Multimap assignment = HashMultimap.create(); Map assignmentCount = new HashMap<>(); @@ -230,11 +280,11 @@ public Multimap computeAssignments(Set splits, Iterable candidateNodes; if (locationAwareScheduling || !split.isRemotelyAccessible()) { - candidateNodes = selectCandidateNodes(nodeMap.get().get(), split); + candidateNodes = selectCandidateNodes(nodeMap.get().get(), split, pd); } else { randomCandidates.reset(); - candidateNodes = selectNodes(minCandidates, randomCandidates); + candidateNodes = selectNodes(minCandidates, randomCandidates, pd); } checkCondition(!candidateNodes.isEmpty(), NO_NODES_AVAILABLE, "No nodes available to run query"); @@ -288,7 +338,7 @@ private ResettableRandomizedIterator randomizedNodes() return new ResettableRandomizedIterator<>(nodes); } - private List selectCandidateNodes(NodeMap nodeMap, Split split) + private List selectCandidateNodes(NodeMap nodeMap, Split split, final PlanFragment.PlanDistribution pd) { Set chosen = new LinkedHashSet<>(minCandidates); String coordinatorIdentifier = nodeManager.getCurrentNode().getNodeIdentifier(); @@ -297,6 +347,7 @@ private List selectCandidateNodes(NodeMap nodeMap, Split split) for (HostAddress hint : split.getAddresses()) { nodeMap.getNodesByHostAndPort().get(hint).stream() .filter(node -> includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier())) + .filter(node -> !scheduleReportNodeIfNecessary(node, pd)) .filter(chosen::add) .forEach(node -> scheduleLocal.incrementAndGet()); @@ -314,6 +365,7 @@ private List selectCandidateNodes(NodeMap nodeMap, Split split) if (!hint.hasPort() || split.isRemotelyAccessible()) { nodeMap.getNodesByHost().get(address).stream() .filter(node -> includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier())) + .filter(node -> !scheduleReportNodeIfNecessary(node, pd)) .filter(chosen::add) .forEach(node -> scheduleLocal.incrementAndGet()); } @@ -332,6 +384,9 @@ private List selectCandidateNodes(NodeMap nodeMap, Split split) } for (Node node : nodeMap.getNodesByRack().get(Rack.of(address))) { if (includeCoordinator || !coordinatorIdentifier.equals(node.getNodeIdentifier())) { + if (scheduleReportNodeIfNecessary(node, pd)) { + continue; + } if (chosen.add(node)) { scheduleRack.incrementAndGet(); } @@ -352,6 +407,9 @@ private List selectCandidateNodes(NodeMap nodeMap, Split split) ResettableRandomizedIterator randomizedIterator = randomizedNodes(); while (randomizedIterator.hasNext()) { Node node = randomizedIterator.next(); + if (scheduleReportNodeIfNecessary(node, pd)) { + continue; + } if (chosen.add(node)) { scheduleRandom.incrementAndGet(); } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/NodeSchedulerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/NodeSchedulerConfig.java index fbfbdae60254..723ed7af94b0 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/NodeSchedulerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/NodeSchedulerConfig.java @@ -27,6 +27,11 @@ public class NodeSchedulerConfig private int maxSplitsPerNode = 100; private int maxPendingSplitsPerNodePerTask = 10; + public static final String REPORT_NODE_DEFAULT = "NA"; + private String reportNodes = REPORT_NODE_DEFAULT; + private boolean schedulerFixToReport = true; + private boolean schedulerSourceToReport = true; + public boolean isMultipleTasksPerNodeEnabled() { return multipleTasksPerNode; @@ -100,4 +105,40 @@ public NodeSchedulerConfig setMaxSplitsPerNode(int maxSplitsPerNode) this.maxSplitsPerNode = maxSplitsPerNode; return this; } + + public String getReportNodes() + { + return reportNodes; + } + + @Config("node-scheduler.report-nodes") + public NodeSchedulerConfig setReportNodes(String reportNodes) + { + this.reportNodes = reportNodes; + return this; + } + + public boolean getSchedulerFixToReport() + { + return schedulerFixToReport; + } + + @Config("node-scheduler.scheduler-fix-to-report") + public NodeSchedulerConfig setSchedulerFixToReport(boolean schedulerFixToReport) + { + this.schedulerFixToReport = schedulerFixToReport; + return this; + } + + public boolean getSchedulerSourceToReport() + { + return schedulerSourceToReport; + } + + @Config("node-scheduler.scheduler-source-to-report") + public NodeSchedulerConfig setSchedulerSourceToReport(boolean schedulerSourceToReport) + { + this.schedulerSourceToReport = schedulerSourceToReport; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java index 98bace1c6bff..4190924f4ad4 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java @@ -440,7 +440,13 @@ else if (fragment.getDistribution() == PlanDistribution.COORDINATOR_ONLY) { private void scheduleFixedNodeCount(int nodeCount) { // create tasks on "nodeCount" random nodes - List nodes = nodeSelector.selectRandomNodes(nodeCount); + List nodes; + if (nodeCount == 1) { + nodes = nodeSelector.selectReportNode(nodeCount); + } + else { + nodes = nodeSelector.selectRandomNodes(nodeCount, PlanDistribution.FIXED); + } checkCondition(!nodes.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available"); ImmutableList.Builder tasks = ImmutableList.builder(); for (int taskId = 0; taskId < nodes.size(); taskId++) { @@ -487,7 +493,8 @@ private void scheduleSourcePartitionedNodes() stateMachine.recordGetSplitTime(start); while (!pendingSplits.isEmpty() && !getState().isDone()) { - Multimap splitAssignment = nodeSelector.computeAssignments(pendingSplits, tasks.values(), controlScanConcurrencyEnabled, scanConcurrencyCount); + Multimap splitAssignment = nodeSelector.computeAssignments(pendingSplits, tasks.values(), + PlanDistribution.SOURCE, controlScanConcurrencyEnabled, scanConcurrencyCount); pendingSplits = ImmutableSet.copyOf(Sets.difference(pendingSplits, ImmutableSet.copyOf(splitAssignment.values()))); assignSplits(nextTaskId, splitAssignment); diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java b/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java index abc0bcf26b13..3ecdc31c3ac1 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.Node; +import com.facebook.presto.sql.planner.PlanFragment; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -100,7 +101,7 @@ public void testLocationAwareSchedulingDisabledScheduleLocal() Split split = new Split("foo", new TestSplitLocal()); Set splits = ImmutableSet.of(split); - Map.Entry assignment = Iterables.getOnlyElement(selector.computeAssignments(splits, taskMap.values(), false, 1).entries()); + Map.Entry assignment = Iterables.getOnlyElement(selector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1).entries()); assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0)); assertEquals(assignment.getValue(), split); } @@ -112,7 +113,7 @@ public void testScheduleLocal() Split split = new Split("foo", new TestSplitLocal()); Set splits = ImmutableSet.of(split); - Map.Entry assignment = Iterables.getOnlyElement(nodeSelector.computeAssignments(splits, taskMap.values(), false, 1).entries()); + Map.Entry assignment = Iterables.getOnlyElement(nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1).entries()); assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0)); assertEquals(assignment.getValue(), split); } @@ -127,13 +128,13 @@ public void testMultipleTasksPerNode() NodeScheduler nodeScheduler = new NodeScheduler(nodeManager, nodeSchedulerConfig, nodeTaskMap); NodeScheduler.NodeSelector nodeSelector = nodeScheduler.createNodeSelector("foo"); - List nodes = nodeSelector.selectRandomNodes(10); + List nodes = nodeSelector.selectRandomNodes(10, PlanFragment.PlanDistribution.FIXED); assertEquals(nodes.size(), 3); nodeSchedulerConfig.setMultipleTasksPerNodeEnabled(true); nodeScheduler = new NodeScheduler(nodeManager, nodeSchedulerConfig, nodeTaskMap); nodeSelector = nodeScheduler.createNodeSelector("foo"); - nodes = nodeSelector.selectRandomNodes(9); + nodes = nodeSelector.selectRandomNodes(9, PlanFragment.PlanDistribution.FIXED); assertEquals(nodes.size(), 9); Map counts = new HashMap<>(); for (Node node : nodes) { @@ -151,7 +152,7 @@ public void testScheduleRemote() { Set splits = new HashSet<>(); splits.add(new Split("foo", new TestSplitRemote())); - Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1); + Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1); assertEquals(assignments.size(), 1); } @@ -164,7 +165,7 @@ public void testBasicAssignment() for (int i = 0; i < 3; i++) { splits.add(new Split("foo", new TestSplitRemote())); } - Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1); + Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1); assertEquals(assignments.entries().size(), 3); for (Node node : nodeManager.getActiveDatasourceNodes("foo")) { assertTrue(assignments.keySet().contains(node)); @@ -194,7 +195,7 @@ public void testMaxSplitsPerNode() for (int i = 0; i < 5; i++) { splits.add(new Split("foo", new TestSplitRemote())); } - Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1); + Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1); // no split should be assigned to the newNode, as it already has maxNodeSplits assigned to it assertFalse(assignments.keySet().contains(newNode)); @@ -228,7 +229,7 @@ public void testMaxSplitsPerNodePerTask() for (int i = 0; i < 5; i++) { splits.add(new Split("foo", new TestSplitRemote())); } - Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), false, 1); + Multimap assignments = nodeSelector.computeAssignments(splits, taskMap.values(), PlanFragment.PlanDistribution.SOURCE, false, 1); // no split should be assigned to the newNode, as it already has // maxSplitsPerNode + maxSplitsPerNodePerTask assigned to it