Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Feature: move partition (#1326)" #1480

Merged
merged 2 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 0 additions & 97 deletions actors/miner/src/deadline_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use super::{
BitFieldQueue, ExpirationSet, Partition, PartitionSectorMap, PoStPartition, PowerPair,
SectorOnChainInfo, Sectors, TerminationResult,
};

use crate::SECTORS_AMT_BITWIDTH;

// Bitwidth of AMTs determined empirically from mutation patterns and projections of mainnet data.
Expand Down Expand Up @@ -103,102 +102,6 @@ impl Deadlines {
self.due[deadline_idx as usize] = store.put_cbor(deadline, Code::Blake2b256)?;
Ok(())
}

pub fn move_partitions<BS: Blockstore>(
policy: &Policy,
store: &BS,
orig_deadline: &mut Deadline,
dest_deadline: &mut Deadline,
partitions: &BitField,
) -> anyhow::Result<()> {
let mut orig_partitions = orig_deadline.partitions_amt(store)?;
let mut dest_partitions = dest_deadline.partitions_amt(store)?;

// even though we're moving partitions intact, we still need to update orig/dest `Deadline` accordingly.

if dest_partitions.count() + partitions.len() > policy.max_partitions_per_deadline {
return Err(actor_error!(
forbidden,
"partitions in dest_deadline will exceed max_partitions_per_deadline"
))?;
}

let first_dest_partition_idx = dest_partitions.count();
for (i, orig_partition_idx) in partitions.iter().enumerate() {
let moving_partition = orig_partitions
.get(orig_partition_idx)?
.ok_or_else(|| actor_error!(not_found, "no partition {}", orig_partition_idx))?
.clone();
if !moving_partition.faults.is_empty() || !moving_partition.unproven.is_empty() {
return Err(actor_error!(forbidden, "partition with faults or unproven sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if orig_deadline.early_terminations.get(orig_partition_idx) {
return Err(actor_error!(forbidden, "partition with early terminated sectors are not allowed to move, partition_idx {}", orig_partition_idx))?;
}
if !moving_partition.faulty_power.is_zero() {
return Err(actor_error!(
illegal_state,
"partition faulty_power should be zero when faults is empty, partition_idx {}",
orig_partition_idx
))?;
}

let dest_partition_idx = first_dest_partition_idx + i as u64;

let sector_count = moving_partition.sectors.len();
let live_sector_count = sector_count - moving_partition.terminated.len();

// start updating orig/dest `Deadline` here

orig_deadline.total_sectors -= sector_count;
orig_deadline.live_sectors -= live_sector_count;

dest_deadline.total_sectors += sector_count;
dest_deadline.live_sectors += live_sector_count;

orig_partitions.set(orig_partition_idx, Partition::new(store)?)?;
dest_partitions.set(dest_partition_idx, moving_partition)?;
}

// update expirations_epochs Cid of Deadline.
// Note that when moving a partition from `orig_expirations_epochs` to `dest_expirations_epochs`,
// we explicitly keep the `dest_epoch` the same as `orig_epoch`, this is by design of not re-quantizing.
{
let mut epochs_to_remove = Vec::<u64>::new();
let mut orig_expirations_epochs: Array<BitField, _> =
Array::load(&orig_deadline.expirations_epochs, store)?;
let mut dest_expirations_epochs: Array<BitField, _> =
Array::load(&dest_deadline.expirations_epochs, store)?;
orig_expirations_epochs.for_each_mut(|orig_epoch, orig_bitfield| {
let dest_epoch = orig_epoch;
let mut to_bitfield =
dest_expirations_epochs.get(dest_epoch)?.cloned().unwrap_or_default();
for (i, partition_id) in partitions.iter().enumerate() {
if orig_bitfield.get(partition_id) {
orig_bitfield.unset(partition_id);
to_bitfield.set(first_dest_partition_idx + i as u64);
}
}
dest_expirations_epochs.set(dest_epoch, to_bitfield)?;

if orig_bitfield.is_empty() {
epochs_to_remove.push(orig_epoch);
}

Ok(())
})?;
if !epochs_to_remove.is_empty() {
orig_expirations_epochs.batch_delete(epochs_to_remove, true)?;
}
orig_deadline.expirations_epochs = orig_expirations_epochs.flush()?;
dest_deadline.expirations_epochs = dest_expirations_epochs.flush()?;
}

orig_deadline.partitions = orig_partitions.flush()?;
dest_deadline.partitions = dest_partitions.flush()?;

Ok(())
}
}

/// Deadline holds the state for all sectors due at a specific deadline.
Expand Down
67 changes: 0 additions & 67 deletions actors/miner/src/deadlines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,73 +127,6 @@ pub fn deadline_available_for_compaction(
)
}

/// the distance between from_deadline and to_deadline clockwise in deadline unit.
fn deadline_distance(policy: &Policy, from_deadline: u64, to_deadline: u64) -> u64 {
if to_deadline >= from_deadline {
to_deadline - from_deadline
} else {
policy.wpost_period_deadlines - from_deadline + to_deadline
}
}

/// only allow moving to a nearer deadline from current one
pub fn ensure_deadline_available_for_move(
policy: &Policy,
orig_deadline: u64,
dest_deadline: u64,
current_deadline: &DeadlineInfo,
) -> Result<(), String> {
if !deadline_is_mutable(
policy,
current_deadline.period_start,
orig_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move from a deadline {}, immutable at epoch {}",
orig_deadline, current_deadline.current_epoch
));
}

if !deadline_is_mutable(
policy,
current_deadline.period_start,
dest_deadline,
current_deadline.current_epoch,
) {
return Err(format!(
"cannot move to a deadline {}, immutable at epoch {}",
dest_deadline, current_deadline.current_epoch
));
}

if deadline_distance(policy, current_deadline.index, dest_deadline)
>= deadline_distance(policy, current_deadline.index, orig_deadline)
{
return Err(format!(
"can only move to a deadline which is nearer from current deadline {}, dest_deadline {} is not nearer than orig_deadline {}",
current_deadline.index, dest_deadline, orig_deadline
));
}

Ok(())
}

// returns the nearest deadline info with index `target_deadline` that has already occured from the point of view of the current deadline(including the current deadline).
pub fn nearest_occured_deadline_info(
policy: &Policy,
current_deadline: &DeadlineInfo,
target_deadline: u64,
) -> DeadlineInfo {
// Find the proving period start for the deadline in question.
let mut pp_start = current_deadline.period_start;
if current_deadline.index < target_deadline {
pp_start -= policy.wpost_proving_period
}

new_deadline_info(policy, pp_start, target_deadline, current_deadline.current_epoch)
}

// Determine current period start and deadline index directly from current epoch and
// the offset implied by the proving period. This works correctly even for the state
// of a miner actor without an active deadline cron
Expand Down
108 changes: 37 additions & 71 deletions actors/miner/src/expiration_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::convert::TryInto;

use anyhow::{anyhow, Context};
use cid::Cid;
use fil_actors_runtime::network::EPOCHS_IN_DAY;
use fil_actors_runtime::runtime::Policy;
use fil_actors_runtime::{ActorDowncast, Array};
use fvm_ipld_amt::{Error as AmtError, ValueMut};
Expand Down Expand Up @@ -644,16 +643,16 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
Ok(())
}

/// Note that the `epoch` parameter doesn't quantize, and assumes the entry for the epoch is non-empty.
fn remove(
&mut self,
epoch: ChainEpoch,
raw_epoch: ChainEpoch,
on_time_sectors: &BitField,
early_sectors: &BitField,
active_power: &PowerPair,
faulty_power: &PowerPair,
pledge: &TokenAmount,
) -> anyhow::Result<()> {
let epoch = self.quant.quantize_up(raw_epoch);
let mut expiration_set = self
.amt
.get(epoch.try_into()?)
Expand Down Expand Up @@ -777,67 +776,46 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size: SectorSize,
sectors: &[SectorOnChainInfo],
) -> anyhow::Result<Vec<SectorExpirationSet>> {
if sectors.is_empty() {
return Ok(Vec::new());
}

let mut declared_expirations = BTreeMap::<ChainEpoch, bool>::new();
let mut sectors_by_number = BTreeMap::<u64, &SectorOnChainInfo>::new();
let mut all_remaining = BTreeSet::<u64>::new();
let mut declared_expirations = BTreeSet::<i64>::new();

for sector in sectors {
declared_expirations.insert(sector.expiration);
let q_expiration = self.quant.quantize_up(sector.expiration);
declared_expirations.insert(q_expiration, true);
all_remaining.insert(sector.sector_number);
sectors_by_number.insert(sector.sector_number, sector);
}

let mut expiration_groups = Vec::<SectorExpirationSet>::with_capacity(sectors.len());

let mut old_end = 0i64;
for expiration in declared_expirations.iter() {
// Basically we're scanning [sector.expiration, sector.expiration+EPOCHS_IN_DAY) for active sectors.
// Since a sector may have been moved from another deadline, the possible range for an active sector is [sector.expiration, sector.expiration+EPOCHS_IN_DAY).
//
// And we're also trying to avoid scanning the same range twice by choosing a proper `start_at`.

let start_at = if *expiration > old_end {
*expiration
} else {
// +1 since the range is inclusive
old_end + 1
};
let new_end = (expiration + EPOCHS_IN_DAY - 1) as u64;

// scan range [start_at, new_end] for active sectors of interest
self.amt.for_each_while_ranged(Some(start_at as u64), None, |epoch, es| {
if epoch > new_end {
// no need to scan any more
return Ok(false);
}

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
epoch as ChainEpoch,
);

if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}

Ok(epoch < new_end && !all_remaining.is_empty())
})?;

old_end = new_end as i64;
let mut expiration_groups =
Vec::<SectorExpirationSet>::with_capacity(declared_expirations.len());

for (&expiration, _) in declared_expirations.iter() {
let es = self.may_get(expiration)?;

let group = group_expiration_set(
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
expiration,
);
if !group.sector_epoch_set.sectors.is_empty() {
expiration_groups.push(group);
}
}

// If sectors remain, traverse next in epoch order. Remaining sectors should be
// rescheduled to expire soon, so this traversal should exit early.
if !all_remaining.is_empty() {
self.amt.for_each_while(|epoch, es| {
let epoch = epoch as ChainEpoch;
// If this set's epoch is one of our declared epochs, we've already processed it
// in the loop above, so skip processing here. Sectors rescheduled to this epoch
// would have been included in the earlier processing.
if declared_expirations.contains_key(&epoch) {
return Ok(true);
}

// Sector should not be found in EarlyExpirations which holds faults. An implicit assumption
// of grouping is that it only returns sectors with active power. ExpirationQueue should not
Expand All @@ -848,7 +826,7 @@ impl<'db, BS: Blockstore> ExpirationQueue<'db, BS> {
sector_size,
&sectors_by_number,
&mut all_remaining,
es,
es.clone(),
epoch,
);

Expand Down Expand Up @@ -933,7 +911,7 @@ fn group_expiration_set(
sector_size: SectorSize,
sectors: &BTreeMap<u64, &SectorOnChainInfo>,
include_set: &mut BTreeSet<u64>,
es: &ExpirationSet,
es: ExpirationSet,
expiration: ChainEpoch,
) -> SectorExpirationSet {
let mut sector_numbers = Vec::new();
Expand All @@ -949,26 +927,14 @@ fn group_expiration_set(
}
}

if sector_numbers.is_empty() {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: ExpirationSet::default(),
}
} else {
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es.clone(), // lazy clone
}
SectorExpirationSet {
sector_epoch_set: SectorEpochSet {
epoch: expiration,
sectors: sector_numbers,
power: total_power,
pledge: total_pledge,
},
expiration_set: es,
}
}

Expand Down
Loading
Loading