Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][scheduler] simplify and improve scheduler #6867

Merged
merged 9 commits into from
Aug 1, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove policy
  • Loading branch information
youkaichao committed Jul 27, 2024
commit 18f8c03a1539ad4072d6c66b978e6bbd11e7783d
23 changes: 4 additions & 19 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus, BlockSpaceManager
from vllm.core.policy import Policy, PolicyFactory
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.prompt_adapter.request import PromptAdapterRequest
Expand Down Expand Up @@ -399,7 +398,6 @@ def _schedule_running(
running_queue: deque,
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
policy: Policy,
enable_chunking: bool = False,
) -> Tuple[deque, SchedulerRunningOutputs]:
"""Schedule sequence groups that are running.
Expand All @@ -413,7 +411,6 @@ def _schedule_running(
when any decodes are preempted.
curr_loras: Currently batched lora request ids. The argument is
in-place updated when any decodes are preempted.
policy: The sorting policy to sort running_queue.
enable_chunking: If True, seq group can be chunked and only a
chunked number of tokens are scheduled if
`budget.num_batched_tokens` has not enough capacity to schedule
Expand Down Expand Up @@ -513,7 +510,6 @@ def _schedule_swapped(
swapped_queue: deque,
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
policy: Policy,
enable_chunking: bool = False,
) -> Tuple[deque, SchedulerSwappedInOutputs]:
"""Schedule sequence groups that are swapped out.
Expand All @@ -529,7 +525,6 @@ def _schedule_swapped(
when any requests are swapped in.
curr_loras: Currently batched lora request ids. The argument is
in-place updated when any requests are swapped in.
policy: The sorting policy to sort swapped_queue.
enable_chunking: If True, seq group can be chunked and only a
chunked number of tokens are scheduled if
`budget.num_batched_tokens` has not enough capacity to schedule
Expand Down Expand Up @@ -787,24 +782,19 @@ def _schedule_default(self) -> SchedulerOutputs:
remaining_waiting, prefills = self._schedule_prefills(
self.waiting, budget, curr_loras, enable_chunking=False)

fcfs_policy = PolicyFactory.get_policy(policy_name="fcfs")
# Don't schedule decodes if prefills are scheduled.
# NOTE: If `_schedule_prefills` doesn't enable chunking, self.running
# only contains decode requests, not chunked prefills.
if len(prefills.seq_groups) == 0:
remaining_running, running_scheduled = self._schedule_running(
self.running,
budget,
curr_loras,
fcfs_policy,
enable_chunking=False)
self.running, budget, curr_loras, enable_chunking=False)

# If any sequence group is preempted, do not swap in any sequence
# group. because it means there's no slot for new running requests.
if len(running_scheduled.preempted) + len(
running_scheduled.swapped_out) == 0:
remaining_swapped, swapped_in = self._schedule_swapped(
self.swapped, budget, curr_loras, fcfs_policy)
self.swapped, budget, curr_loras)

assert (budget.num_batched_tokens <=
self.scheduler_config.max_num_batched_tokens)
Expand Down Expand Up @@ -875,20 +865,15 @@ def _schedule_chunked_prefill(self):
self.swapped, SchedulerSwappedInOutputs.create_empty())

# Decoding should be always scheduled first by fcfs.
fcfs_policy = PolicyFactory.get_policy(policy_name="fcfs")
remaining_running, running_scheduled = self._schedule_running(
self.running,
budget,
curr_loras,
fcfs_policy,
enable_chunking=True)
self.running, budget, curr_loras, enable_chunking=True)

# Schedule swapped out requests.
# If preemption happens, it means we don't have space for swap-in.
if len(running_scheduled.preempted) + len(
running_scheduled.swapped_out) == 0:
remaining_swapped, swapped_in = self._schedule_swapped(
self.swapped, budget, curr_loras, fcfs_policy)
self.swapped, budget, curr_loras)

# Schedule new prefills.
remaining_waiting, prefills = self._schedule_prefills(
Expand Down