Skip to content

Commit

Permalink
Fix for msqQ clean
Browse files Browse the repository at this point in the history
  • Loading branch information
Slava committed May 19, 2023
1 parent 86efa6c commit 006c7b0
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 30 deletions.
7 changes: 5 additions & 2 deletions src/validator/collator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1850,8 +1850,10 @@ impl Collator {
}
}
if self.check_cutoff_timeout() {
log::warn!("{}: TIMEOUT ({}ms) is elapsed, stop processing import_new_shard_top_blocks",
self.collated_block_descr, self.cutoff_timeout.as_millis());
log::warn!(
"{}: TIMEOUT ({}ms) is elapsed, stop processing import_new_shard_top_blocks",
self.collated_block_descr, self.cutoff_timeout.as_millis()
);
break
}
}
Expand Down Expand Up @@ -2294,6 +2296,7 @@ impl Collator {
// Newly generating messages will be executed next itaration (only after waiting).

let mut new_messages = std::mem::take(&mut collator_data.new_messages);
// we can get sorted items somehow later
while let Some(NewMessage{ lt_hash: (created_lt, hash), msg, tr_cell, prefix }) = new_messages.pop() {
let info = msg.int_header().ok_or_else(|| error!("message is not internal"))?;
let fwd_fee = *info.fwd_fee();
Expand Down
108 changes: 80 additions & 28 deletions src/validator/out_msg_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl OutMsgQueueInfoStuff {
let queue1 = Self::calc_split_queues(queue0, &s0)?;
log::info!("precalc_split_queues after block {}, TIME {}ms",
block_id, now.elapsed().as_millis());
engine.set_split_queues(block_id, queue0.clone(), queue1, usage_tree.build_visited_set());
engine.set_split_queues(block_id, queue0.clone(), queue1, usage_tree.build_visited_set());
} else {
log::trace!("precalc_split_queues {} already calculating or calculated", block_id);
}
Expand Down Expand Up @@ -883,36 +883,66 @@ impl MsgQueueManager {
let mut deleted = 0;

// Temp fix. Need to review and fix limits management further.
// TODO: put it to config
let mut processed_count_limit = 25_000;

queue.hashmap_filter(|_key, mut slice| {
if block_full {
log::warn!("BLOCK FULL when cleaning output queue, cleanup is partial");
log::warn!("BLOCK FULL when fast cleaning output queue, cleanup is partial");
partial = true;
return Ok(HashmapFilterResult::Stop)
}

// Temp fix. Need to review and fix limits management further.
processed_count_limit -= 1;
if processed_count_limit == 0 {
log::warn!("clean_out_msg_queue: stopped cleaning messages queue because of count limit");
log::warn!("clean_out_msg_queue: stopped fast cleaning messages queue because of count limit");
partial = true;
return Ok(HashmapFilterResult::Stop)
}
processed_count_limit -= 1;

let lt = u64::construct_from(&mut slice)?;
let enq = MsgEnqueueStuff::construct_from(&mut slice, lt)?;
// log::debug!("Scanning outbound {}", enq);
let (processed, end_lt) = self.already_processed(&enq)?;
if processed {
log::debug!("Outbound {} has beed already delivered, dequeueing", enq);
// log::debug!("Outbound {} has beed already delivered, dequeueing fast", enq);
block_full = on_message(Some((enq, end_lt)), self.next_out_queue_info.out_queue.data())?;
deleted += 1;
return Ok(HashmapFilterResult::Remove)
}
skipped += 1;
Ok(HashmapFilterResult::Accept)
})?;
// we reached processed_count_limit - try to remove slow by LT_HASH
if processed_count_limit == 0 {
// TODO: put it to config
processed_count_limit = 50;
let mut iter = MsgQueueMergerIterator::from_queue(&queue)?;
while let Some(k_v) = iter.next() {
let (key, enq, _created_lt, _) = k_v?;
if block_full {
log::warn!("BLOCK FULL when slow cleaning output queue, cleanup is partial");
partial = true;
break;
}
if processed_count_limit == 0 {
log::warn!("clean_out_msg_queue: stopped slow cleaning messages queue because of count limit");
partial = true;
break;
}
processed_count_limit -= 1;
let (processed, end_lt) = self.already_processed(&enq)?;
if !processed {
break;
}
// log::debug!("Outbound {} has beed already delivered, dequeueing slow", enq);
queue.remove(SliceData::load_builder(key.write_to_new_cell()?)?)?;
block_full = on_message(Some((enq, end_lt)), self.next_out_queue_info.out_queue.data())?;
deleted += 1;
}
queue.after_remove()?;
}
log::debug!("Deleted {} messages from out_msg_queue, skipped {}", deleted, skipped);
self.next_out_queue_info.out_queue = queue;
// if (verbosity >= 2) {
Expand Down Expand Up @@ -953,7 +983,7 @@ impl MsgQueueManager {

impl MsgQueueManager {
/// create iterator for merging all output messages from all neighbors to our shard
pub fn merge_out_queue_iter(&self, shard: &ShardIdent) -> Result<MsgQueueMergerIterator> {
pub fn merge_out_queue_iter(&self, shard: &ShardIdent) -> Result<MsgQueueMergerIterator<BlockIdExt>> {
MsgQueueMergerIterator::from_manager(self, shard)
}
/// find enquque message and return it with neighbor id
Expand Down Expand Up @@ -983,33 +1013,45 @@ impl MsgQueueManager {
}

#[derive(Eq, PartialEq)]
struct RootRecord {
struct RootRecord<T> {
lt: u64,
cursor: SliceData,
bit_len: usize,
key: BuilderData,
block_id: BlockIdExt
id: T
}

impl RootRecord {
impl<T: Eq> RootRecord<T> {
fn new(
lt: u64,
cursor: SliceData,
bit_len: usize,
key: BuilderData,
block_id: BlockIdExt
id: T
) -> Self {
Self {
lt,
cursor,
bit_len,
key,
block_id
id
}
}
fn from_cell(cell: &Cell, mut bit_len: usize, id: T) -> Result<Self> {
let mut cursor = SliceData::load_cell_ref(cell)?;
let key = cursor.get_label_raw(&mut bit_len, BuilderData::default())?;
let lt = cursor.get_next_u64()?;
Ok(Self {
lt,
cursor,
bit_len,
key,
id
})
}
}

impl Ord for RootRecord {
impl<T: Eq> Ord for RootRecord<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// first compare lt descending, because Vec is a stack
let mut cmp = self.lt.cmp(&other.lt);
Expand All @@ -1024,29 +1066,26 @@ impl Ord for RootRecord {
cmp.reverse()
}
}
impl PartialOrd for RootRecord {
impl<T: Eq> PartialOrd for RootRecord<T> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) }
}

/// it iterates messages ascending create_lt and hash
pub struct MsgQueueMergerIterator {
pub struct MsgQueueMergerIterator<T> {
// store branches descending by lt and hash because Vec works like Stack
roots: Vec<RootRecord>,
roots: Vec<RootRecord<T>>,
}

impl MsgQueueMergerIterator {
impl MsgQueueMergerIterator<BlockIdExt> {
pub fn from_manager(manager: &MsgQueueManager, shard: &ShardIdent) -> Result<Self> {
let shard_prefix = shard.shard_key(true);
let mut roots = vec![];
for nb in manager.neighbors.iter().filter(|nb| !nb.is_disabled()) {
let mut out_queue_short = nb.out_queue.clone();
out_queue_short.into_subtree_with_prefix(&shard_prefix, &mut 0)?;
if let Some(root) = out_queue_short.data() {
let mut cursor = SliceData::load_cell_ref(root)?;
let mut bit_len = out_queue_short.bit_len();
let key = cursor.get_label_raw(&mut bit_len, BuilderData::default())?;
let lt = cursor.get_next_u64()?;
roots.push(RootRecord::new(lt, cursor, bit_len, key, nb.block_id().clone()));
if let Some(cell) = out_queue_short.data() {
roots.push(RootRecord::from_cell(cell, out_queue_short.bit_len(), nb.block_id().clone())?);
// roots.push(RootRecord::new(lt, cursor, bit_len, key, nb.block_id().clone()));
}
}
if !roots.is_empty() {
Expand All @@ -1055,17 +1094,30 @@ impl MsgQueueMergerIterator {
}
Ok(Self { roots })
}
fn insert(&mut self, root: RootRecord) {
}

impl MsgQueueMergerIterator<u8> {
pub fn from_queue(out_queue: &OutMsgQueue) -> Result<Self> {
let mut roots = Vec::new();
if let Some(cell) = out_queue.data() {
roots.push(RootRecord::from_cell(cell, out_queue.bit_len(), 0)?);
}
Ok(Self { roots })
}
}

impl<T: Clone + Eq> MsgQueueMergerIterator<T> {
fn insert(&mut self, root: RootRecord<T>) {
let idx = self.roots.binary_search(&root).unwrap_or_else(|x| x);
self.roots.insert(idx, root);
debug_assert!(self.roots.first().unwrap().lt >= self.roots.last().unwrap().lt);
}
fn next_item(&mut self) -> Result<Option<(OutMsgQueueKey, MsgEnqueueStuff, u64, BlockIdExt)>> {
fn next_item(&mut self) -> Result<Option<(OutMsgQueueKey, MsgEnqueueStuff, u64, T)>> {
while let Some(mut root) = self.roots.pop() {
if root.bit_len == 0 {
let key = OutMsgQueueKey::construct_from_cell(root.key.into_cell()?)?;
let enq = MsgEnqueueStuff::construct_from(&mut root.cursor, root.lt)?;
return Ok(Some((key, enq, root.lt, root.block_id)))
return Ok(Some((key, enq, root.lt, root.id)))
}
for idx in 0..2 {
let mut bit_len = root.bit_len - 1;
Expand All @@ -1074,15 +1126,15 @@ impl MsgQueueMergerIterator {
key.append_bit_bool(idx == 1)?;
key = cursor.get_label_raw(&mut bit_len, key)?;
let lt = cursor.get_next_u64()?;
self.insert(RootRecord::new(lt, cursor, bit_len, key, root.block_id.clone()));
self.insert(RootRecord::new(lt, cursor, bit_len, key, root.id.clone()));
}
}
Ok(None)
}
}

impl Iterator for MsgQueueMergerIterator {
type Item = Result<(OutMsgQueueKey, MsgEnqueueStuff, u64, BlockIdExt)>;
impl<T: Clone + Eq> Iterator for MsgQueueMergerIterator<T> {
type Item = Result<(OutMsgQueueKey, MsgEnqueueStuff, u64, T)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_item().transpose()
}
Expand Down

0 comments on commit 006c7b0

Please sign in to comment.