Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mszu/merged scheduler #799

Draft
wants to merge 1 commit into
base: mlperf_features
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ def _schedule_priority_preemption(
running_queue[-1]) > self._get_priority(seq_group):
#Only preempt if waiting sequence cannot be allocated
can_allocate = self.block_manager.can_allocate(seq_group)
logger.warning("schedule_priority_preemption")
if (num_new_tokens_uncached > 0
and can_allocate == AllocStatus.OK
and budget.can_schedule(
Expand Down Expand Up @@ -981,6 +982,104 @@ def _schedule_prefills(
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
enable_chunking: bool = False,
) -> SchedulerPrefillOutputs:
ignored_seq_groups: List[SequenceGroup] = []
seq_groups: List[ScheduledSequenceGroup] = []

waiting_queue = self.waiting

leftover_waiting_sequences: Deque[SequenceGroup] = deque()
if not self._passed_delay(time.time()):
# Queue requests that couldn't be scheduled.
waiting_queue.extendleft(leftover_waiting_sequences)
if len(seq_groups) > 0:
self.prev_prompt = True

return SchedulerPrefillOutputs(
seq_groups=seq_groups,
ignored_seq_groups=ignored_seq_groups,
num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=True, enable_chunking=enable_chunking))

blocks_needed = 0
end_of_batch_index = -1
num_free_gpu_blocks = self.block_manager.block_allocator.get_num_free_blocks(device=Device.GPU) - 96 # TODO Tune the reserve
total_length = 0
# TODO Refactor this code (the logic is the same)
for idx in range(len(waiting_queue)):
seq_group = waiting_queue[idx]
seq_len = seq_group.get_seqs(status=SequenceStatus.WAITING)[0].get_len()
if seq_len == 1:
end_of_batch_index = idx
break
total_length += seq_len
blocks_needed += (seq_len+1) // 128 + 1 # TODO FIXME
budget_can_schedule = budget.can_schedule(num_new_tokens = total_length, num_new_seqs = end_of_batch_index) if end_of_batch_index >= 0 else False

can_schedule_1k = budget_can_schedule and num_free_gpu_blocks > blocks_needed

if can_schedule_1k:
end_of_batch_index_2k = -1
for idx in range(end_of_batch_index+1, len(waiting_queue)):
seq_group = waiting_queue[idx]
seq_len = seq_group.get_seqs(status=SequenceStatus.WAITING)[0].get_len()
if seq_len == 1:
end_of_batch_index_2k = idx
break
total_length += seq_len
blocks_needed += (seq_len+1) // 128 + 1 # TODO FIXME
budget_can_schedule = budget.can_schedule(num_new_tokens = total_length, num_new_seqs = end_of_batch_index_2k) if end_of_batch_index_2k >= 0 else False
can_schedule_2k = budget_can_schedule and num_free_gpu_blocks > blocks_needed
if can_schedule_2k:
end_of_batch_index = end_of_batch_index_2k

if can_schedule_1k:
for idx in range(end_of_batch_index + 1):
seq_group = waiting_queue[0]
length = seq_group.get_seqs(status=SequenceStatus.WAITING)[0].get_len()

if length == 1:
waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
waiting_seqs[0].status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
else:
self._allocate_and_set_running(seq_group)

seq_group.init_multi_step_from_lookahead_slots(
num_lookahead_slots=0,
num_scheduler_steps=self.scheduler_config.
num_scheduler_steps,
is_multi_step=self.scheduler_config.is_multi_step,
enable_chunking=enable_chunking)
seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=length))
budget.add_num_batched_tokens(
seq_group.request_id,
num_batched_tokens=length,
num_cached_tokens=0,
)
budget.add_num_seqs(seq_group.request_id, 1)

waiting_queue.popleft()

# Queue requests that couldn't be scheduled.
waiting_queue.extendleft(leftover_waiting_sequences)
if len(seq_groups) > 0:
self.prev_prompt = True

return SchedulerPrefillOutputs(
seq_groups=seq_groups,
ignored_seq_groups=ignored_seq_groups,
num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=True, enable_chunking=enable_chunking))


def _schedule_prefills_original(
self,
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
enable_chunking: bool = False,
) -> SchedulerPrefillOutputs:
"""Schedule sequence groups that are in prefill stage.

Expand Down
Loading