Skip to content

Commit

Permalink
Revert "feat: EXC-1735: Charge canisters for full execution (#1782)"
Browse files Browse the repository at this point in the history
This reverts commit fcb7192.
  • Loading branch information
dsarlis committed Oct 9, 2024
1 parent 35429e6 commit 6de6148
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 196 deletions.
1 change: 0 additions & 1 deletion rs/execution_environment/benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ fn main() {
let round_schedule = RoundSchedule::new(
scheduler_cores,
long_execution_cores,
0,
ordered_new_execution_canister_ids,
ordered_long_execution_canister_ids,
);
Expand Down
53 changes: 18 additions & 35 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ impl SchedulerImpl {
// `(compute_capacity - total_compute_allocation) * multiplier / number_of_canisters`
// can be simplified to just
// `(compute_capacity - total_compute_allocation) * scheduler_cores`
let free_capacity_per_canister = compute_capacity_percent
.saturating_sub(total_compute_allocation_percent)
let free_capacity_per_canister = (compute_capacity_percent
.saturating_sub(total_compute_allocation_percent))
* scheduler_cores as i64;

// Fully divide the free allocation across all canisters.
Expand Down Expand Up @@ -344,7 +344,6 @@ impl SchedulerImpl {
let round_schedule = RoundSchedule::new(
scheduler_cores,
long_execution_cores,
total_compute_allocation_percent,
round_states
.iter()
.skip(number_of_long_executions)
Expand Down Expand Up @@ -644,7 +643,7 @@ impl SchedulerImpl {
scheduler_round_limits: &mut SchedulerRoundLimits,
registry_settings: &RegistryExecutionSettings,
idkg_subnet_public_keys: &BTreeMap<MasterPublicKeyId, MasterPublicKey>,
) -> (ReplicatedState, BTreeSet<CanisterId>, BTreeSet<CanisterId>) {
) -> (ReplicatedState, BTreeSet<CanisterId>) {
let measurement_scope =
MeasurementScope::nested(&self.metrics.round_inner, root_measurement_scope);
let mut ingress_execution_results = Vec::new();
Expand All @@ -655,9 +654,6 @@ impl SchedulerImpl {

let mut heartbeat_and_timer_canister_ids = BTreeSet::new();
let mut round_executed_canister_ids = BTreeSet::new();
// The set of canisters marked as fully executed: have no messages to execute
// or were scheduled first on a core.
let mut round_fully_executed_canister_ids = BTreeSet::new();

// Start iteration loop:
// - Execute subnet messages.
Expand Down Expand Up @@ -729,7 +725,6 @@ impl SchedulerImpl {
let (
active_canisters,
executed_canister_ids,
fully_executed_canister_ids,
mut loop_ingress_execution_results,
heap_delta,
) = self.execute_canisters_in_inner_round(
Expand All @@ -744,10 +739,9 @@ impl SchedulerImpl {
);
let instructions_consumed = instructions_before - round_limits.instructions;
drop(execution_timer);
round_executed_canister_ids.extend(executed_canister_ids);

let finalization_timer = self.metrics.round_inner_iteration_fin.start_timer();
round_executed_canister_ids.extend(executed_canister_ids);
round_fully_executed_canister_ids.extend(fully_executed_canister_ids);
total_heap_delta += heap_delta;
state.metadata.heap_delta_estimate += heap_delta;

Expand Down Expand Up @@ -853,11 +847,7 @@ impl SchedulerImpl {
.heap_delta_rate_limited_canisters_per_round
.observe(round_filtered_canisters.rate_limited_canister_ids.len() as f64);

(
state,
round_filtered_canisters.active_canister_ids,
round_fully_executed_canister_ids,
)
(state, round_filtered_canisters.active_canister_ids)
}

/// Executes canisters in parallel using the thread pool.
Expand All @@ -884,7 +874,6 @@ impl SchedulerImpl {
) -> (
Vec<CanisterState>,
BTreeSet<CanisterId>,
Vec<CanisterId>,
Vec<(MessageId, IngressStatus)>,
NumBytes,
) {
Expand All @@ -898,7 +887,6 @@ impl SchedulerImpl {
canisters_by_thread.into_iter().flatten().collect(),
BTreeSet::new(),
vec![],
vec![],
NumBytes::from(0),
);
}
Expand Down Expand Up @@ -962,15 +950,13 @@ impl SchedulerImpl {
// Aggregate `results_by_thread` to get the result of this function.
let mut canisters = Vec::new();
let mut executed_canister_ids = BTreeSet::new();
let mut fully_executed_canister_ids = vec![];
let mut ingress_results = Vec::new();
let mut total_instructions_executed = NumInstructions::from(0);
let mut max_instructions_executed_per_thread = NumInstructions::from(0);
let mut heap_delta = NumBytes::from(0);
for mut result in results_by_thread.into_iter() {
canisters.append(&mut result.canisters);
executed_canister_ids.extend(result.executed_canister_ids);
fully_executed_canister_ids.extend(result.fully_executed_canister_ids);
ingress_results.append(&mut result.ingress_results);
let instructions_executed = as_num_instructions(
round_limits_per_thread.instructions - result.round_limits.instructions,
Expand Down Expand Up @@ -1004,7 +990,6 @@ impl SchedulerImpl {
(
canisters,
executed_canister_ids,
fully_executed_canister_ids,
ingress_results,
heap_delta,
)
Expand Down Expand Up @@ -1666,7 +1651,7 @@ impl Scheduler for SchedulerImpl {
};

// Inner round.
let (mut state, active_canister_ids, fully_executed_canister_ids) = self.inner_round(
let (mut state, active_canister_ids) = self.inner_round(
state,
&mut csprng,
&round_schedule,
Expand Down Expand Up @@ -1827,10 +1812,6 @@ impl Scheduler for SchedulerImpl {
.num_canister_snapshots
.set(final_state.canister_snapshots.count() as i64);
}
round_schedule.finish_round(
&mut final_state.canister_states,
fully_executed_canister_ids,
);
self.finish_round(&mut final_state, current_round_type);
final_state
.metadata
Expand Down Expand Up @@ -1906,7 +1887,6 @@ fn observe_instructions_consumed_per_message(
struct ExecutionThreadResult {
canisters: Vec<CanisterState>,
executed_canister_ids: BTreeSet<CanisterId>,
fully_executed_canister_ids: Vec<CanisterId>,
ingress_results: Vec<(MessageId, IngressStatus)>,
slices_executed: NumSlices,
messages_executed: NumMessages,
Expand Down Expand Up @@ -1944,7 +1924,6 @@ fn execute_canisters_on_thread(
// These variables accumulate the results and will be returned at the end.
let mut canisters = vec![];
let mut executed_canister_ids = BTreeSet::new();
let mut fully_executed_canister_ids = vec![];
let mut ingress_results = vec![];
let mut total_slices_executed = NumSlices::from(0);
let mut total_messages_executed = NumMessages::from(0);
Expand Down Expand Up @@ -2057,13 +2036,18 @@ fn execute_canisters_on_thread(
if let Some(es) = &mut canister.execution_state {
es.last_executed_round = round_id;
}
RoundSchedule::finish_canister_execution(
&mut canister,
&mut fully_executed_canister_ids,
round_id,
is_first_iteration,
rank,
);
let full_message_execution = match canister.next_execution() {
NextExecution::None => true,
NextExecution::StartNew => false,
// We just finished a full slice of executions.
NextExecution::ContinueLong | NextExecution::ContinueInstallCode => true,
};
let scheduled_first = is_first_iteration && rank == 0;
if full_message_execution || scheduled_first {
// The very first canister is considered to have a full execution round for
// scheduling purposes even if it did not complete within the round.
canister.scheduler_state.last_full_execution_round = round_id;
}
canister.system_state.canister_metrics.executed += 1;
canisters.push(canister);
round_limits.instructions -=
Expand All @@ -2073,7 +2057,6 @@ fn execute_canisters_on_thread(
ExecutionThreadResult {
canisters,
executed_canister_ids,
fully_executed_canister_ids,
ingress_results,
slices_executed: total_slices_executed,
messages_executed: total_messages_executed,
Expand Down
69 changes: 2 additions & 67 deletions rs/execution_environment/src/scheduler/round_schedule.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeMap, HashMap};

use ic_base_types::{CanisterId, NumBytes};
use ic_config::flag_status::FlagStatus;
use ic_replicated_state::{canister_state::NextExecution, CanisterState};
use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, LongExecutionMode};
use ic_types::{AccumulatedPriority, ComputeAllocation, LongExecutionMode};

/// Round metrics required to prioritize a canister.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -42,8 +42,6 @@ pub struct RoundSchedule {
pub scheduler_cores: usize,
/// Number of cores dedicated for long executions.
pub long_execution_cores: usize,
// Sum of all canisters compute allocation in percent.
pub total_compute_allocation_percent: i64,
/// Ordered Canister IDs with new executions.
pub ordered_new_execution_canister_ids: Vec<CanisterId>,
/// Ordered Canister IDs with long executions.
Expand All @@ -54,15 +52,13 @@ impl RoundSchedule {
pub fn new(
scheduler_cores: usize,
long_execution_cores: usize,
total_compute_allocation_percent: i64,
ordered_new_execution_canister_ids: Vec<CanisterId>,
ordered_long_execution_canister_ids: Vec<CanisterId>,
) -> Self {
RoundSchedule {
scheduler_cores,
long_execution_cores: long_execution_cores
.min(ordered_long_execution_canister_ids.len()),
total_compute_allocation_percent,
ordered_new_execution_canister_ids,
ordered_long_execution_canister_ids,
}
Expand Down Expand Up @@ -178,7 +174,6 @@ impl RoundSchedule {
RoundSchedule::new(
self.scheduler_cores,
self.long_execution_cores,
self.total_compute_allocation_percent,
ordered_new_execution_canister_ids,
ordered_long_execution_canister_ids,
),
Expand Down Expand Up @@ -234,64 +229,4 @@ impl RoundSchedule {

(canisters_partitioned_by_cores, canisters)
}

pub fn finish_canister_execution(
canister: &mut CanisterState,
fully_executed_canister_ids: &mut Vec<CanisterId>,
round_id: ExecutionRound,
is_first_iteration: bool,
rank: usize,
) {
let full_message_execution = match canister.next_execution() {
NextExecution::None => true,
NextExecution::StartNew => false,
// We just finished a full slice of executions.
NextExecution::ContinueLong => true,
NextExecution::ContinueInstallCode => false,
};
let scheduled_first = is_first_iteration && rank == 0;

// The very first canister is considered to have a full execution round for
// scheduling purposes even if it did not complete within the round.
if full_message_execution || scheduled_first {
canister.scheduler_state.last_full_execution_round = round_id;

// We schedule canisters (as opposed to individual messages),
// and we charge for every full execution round.
fully_executed_canister_ids.push(canister.canister_id());
}
}

pub(crate) fn finish_round(
&self,
canister_states: &mut BTreeMap<CanisterId, CanisterState>,
fully_executed_canister_ids: BTreeSet<CanisterId>,
) {
let scheduler_cores = self.scheduler_cores;
let number_of_canisters = canister_states.len();
let multiplier = (scheduler_cores * number_of_canisters).max(1) as i64;

// Charge canisters for full executions in this round.
let mut total_charged_priority = 0;
for canister_id in fully_executed_canister_ids {
if let Some(canister) = canister_states.get_mut(&canister_id) {
total_charged_priority += 100 * multiplier;
canister.scheduler_state.priority_credit += (100 * multiplier).into();
}
}

let total_allocated = self.total_compute_allocation_percent * multiplier;
// Free capacity per canister in multiplied percent.
let free_capacity_per_canister = total_charged_priority.saturating_sub(total_allocated)
/ number_of_canisters.max(1) as i64;
// Fully divide the free allocation across all canisters.
for canister in canister_states.values_mut() {
// De-facto compute allocation includes bonus allocation
let factual = canister.scheduler_state.compute_allocation.as_percent() as i64
* multiplier
+ free_capacity_per_canister;
// Increase accumulated priority by de-facto compute allocation.
canister.scheduler_state.accumulated_priority += factual.into();
}
}
}
93 changes: 0 additions & 93 deletions rs/execution_environment/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5921,96 +5921,3 @@ fn inner_round_long_execution_is_a_full_execution() {
// The accumulated priority invariant should be respected.
assert_eq!(total_accumulated_priority - total_priority_credit, 0);
}

#[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })]
fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: usize) {
let instructions = 20;
let messages_per_round = 2;
let mut test = SchedulerTestBuilder::new()
.with_scheduler_config(SchedulerConfig {
scheduler_cores,
max_instructions_per_round: (instructions * messages_per_round).into(),
max_instructions_per_message: instructions.into(),
max_instructions_per_message_without_dts: instructions.into(),
max_instructions_per_slice: instructions.into(),
instruction_overhead_per_execution: 0.into(),
instruction_overhead_per_canister: 0.into(),
instruction_overhead_per_canister_for_finalization: 0.into(),
..SchedulerConfig::application_subnet()
})
.build();

// Bump up the round number.
test.execute_round(ExecutionRoundType::OrdinaryRound);

// Create `messages_per_round * 2` canisters for each scheduler core.
let num_canisters = scheduler_cores as u64 * messages_per_round * 2;
let mut canister_ids = vec![];
for _ in 0..num_canisters {
let canister_id = test.create_canister();
// Send one messages per canister. Having `max_messages_per_round * 2` canisters,
// only half of them will finish in one round.
test.send_ingress(canister_id, ingress(instructions));
canister_ids.push(canister_id);
}

test.execute_round(ExecutionRoundType::OrdinaryRound);

let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
for (i, canister) in test.state().canisters_iter().enumerate() {
if i < num_canisters as usize / 2 {
// The first half of the canisters should finish their messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0);
prop_assert_eq!(canister.system_state.canister_metrics.executed, 1);
prop_assert_eq!(
canister.scheduler_state.last_full_execution_round,
test.last_round()
);
} else {
// The second half of the canisters should still have their messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1);
prop_assert_eq!(canister.system_state.canister_metrics.executed, 0);
prop_assert_eq!(canister.scheduler_state.last_full_execution_round, 0.into());
}
total_accumulated_priority += canister.scheduler_state.accumulated_priority.get();
total_priority_credit += canister.scheduler_state.priority_credit.get();
}
prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0);

// Send one more message for first half of the canisters.
for (i, canister) in canister_ids.iter().enumerate() {
if i < num_canisters as usize / 2 {
test.send_ingress(*canister, ingress(instructions));
}
}

test.execute_round(ExecutionRoundType::OrdinaryRound);

let mut total_accumulated_priority = 0;
let mut total_priority_credit = 0;
for (i, canister) in test.state().canisters_iter().enumerate() {
// Now all the canisters should be executed once.
prop_assert_eq!(canister.system_state.canister_metrics.executed, 1);
if i < num_canisters as usize / 2 {
// The first half of the canisters should have messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 1);
// The first half of the canisters should be executed two rounds ago.
prop_assert_eq!(
canister.scheduler_state.last_full_execution_round.get(),
test.last_round().get() - 1
);
} else {
// The second half of the canisters should finish their messages.
prop_assert_eq!(canister.system_state.queues().ingress_queue_size(), 0);
// The second half of the canisters should be executed in the last round.
prop_assert_eq!(
canister.scheduler_state.last_full_execution_round,
test.last_round()
);
}
total_accumulated_priority += canister.scheduler_state.accumulated_priority.get();
total_priority_credit += canister.scheduler_state.priority_credit.get();
}
prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0);
}

0 comments on commit 6de6148

Please sign in to comment.