Skip to content

Commit

Permalink
cleanup zombie task
Browse files Browse the repository at this point in the history
  • Loading branch information
b-yap committed May 10, 2024
1 parent 1c54466 commit db588a5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
13 changes: 12 additions & 1 deletion clients/vault/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ impl Service<VaultServiceConfig, Error> for VaultService {

async fn start(&mut self) -> Result<(), ServiceError<Error>> {
let result = self.run_service().await;

self.shutdown_wallet().await;

if let Err(error) = result {
let _ = self.shutdown.send(());
Err(error)
Expand Down Expand Up @@ -776,7 +779,7 @@ impl VaultService {
// purposefully _after_ register_vault_if_not_present and _before_ other calls
self.vault_id_manager.fetch_vault_ids().await?;

let wallet = self.stellar_wallet.write().await;
let mut wallet = self.stellar_wallet.write().await;
let vault_public_key = wallet.public_key();
let is_public_network = wallet.is_public_network();

Expand Down Expand Up @@ -940,4 +943,12 @@ impl VaultService {
tracing::info!("Got new block at height {startup_height}");
Ok(startup_height)
}

/// shuts down the resubmission task running in the background
async fn shutdown_wallet(&self) {
tracing::info!("shutdown_wallet(): stop the resubmission scheduler");
let mut wallet = self.stellar_wallet.write().await;
wallet.stop_periodic_resubmission_of_transactions().await;
drop(wallet);
}
}
35 changes: 30 additions & 5 deletions clients/wallet/src/resubmissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use primitives::{
TransactionEnvelopeExt,
};
use std::time::Duration;
use tokio::time::sleep;
use tokio::{sync::mpsc, time::sleep};

use crate::horizon::responses::TransactionsResponseIter;
#[cfg(test)]
Expand All @@ -26,10 +26,26 @@ const MAXIMUM_TX_FEE: u32 = 10_000_000; // 1 XLM

#[cfg_attr(test, mockable)]
impl StellarWallet {
/// sends a signal to stop the resubmission task
pub async fn stop_periodic_resubmission_of_transactions(&mut self) {
match &self.resubmission_end_signal {
None => {
tracing::warn!("stop_periodic_resubmission_of_transactions(): no schedule to stop");
},
Some(sender) =>
if let Err(e) = sender.send(()).await {
tracing::warn!("stop_periodic_resubmission_of_transactions(): failed to send a stop message to scheduler: {e:?}");
},
}
}
/// reads in storage the failed (but recoverable) transactions and submit again to Stellar.
pub async fn start_periodic_resubmission_of_transactions_from_cache(
&self,
&mut self,
interval_in_seconds: u64,
) {
// to make sure we don't leave the thread idle, use this channel to properly shut it down.
let (sender, mut receiver) = mpsc::channel(2);

// Perform the resubmission
self._resubmit_transactions_from_cache().await;

Expand All @@ -40,11 +56,18 @@ impl StellarWallet {
tokio::spawn(async move {
let me_clone = Arc::clone(&me);
loop {
pause_process_in_secs(interval_in_seconds).await;
// a shutdown message was sent. Stop the loop.
if let Some(_) = receiver.recv().await {
tracing::info!("start_periodic_resubmission_of_transactions_from_cache(): scheduler stopped.");
break;
}

pause_process_in_secs(interval_in_seconds).await;
me_clone._resubmit_transactions_from_cache().await;
}
});

self.resubmission_end_signal = Some(sender)
}

#[doc(hidden)]
Expand Down Expand Up @@ -908,7 +931,7 @@ mod test {
let wallet = wallet_with_storage("resources/resubmit_transactions_works")
.expect("should return a wallet")
.clone();
let wallet = wallet.write().await;
let mut wallet = wallet.write().await;

let seq_number = wallet.get_sequence().await.expect("should return a sequence");

Expand Down Expand Up @@ -971,7 +994,7 @@ mod test {
.mock_safe(move |_, _| MockResult::Return(Box::pin(async move { false })));

// let's resubmit these 3 transactions
let _ = wallet.start_periodic_resubmission_of_transactions_from_cache(60).await;
wallet.start_periodic_resubmission_of_transactions_from_cache(60).await;

// We wait until the whole cache is empty because eventually all transactions should be
// handled
Expand All @@ -989,6 +1012,8 @@ mod test {
}
}

// shutdown the thread properly
wallet.stop_periodic_resubmission_of_transactions().await;
wallet.remove_cache_dir();
}
}
6 changes: 5 additions & 1 deletion clients/wallet/src/stellar_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use primitives::stellar::{
Asset as StellarAsset, Operation, PublicKey, SecretKey, StellarTypeToString, Transaction,
TransactionEnvelope,
};
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};

use crate::{
cache::WalletStateStorage,
Expand Down Expand Up @@ -54,6 +54,9 @@ pub struct StellarWallet {

/// a client to connect to Horizon
pub(crate) client: Client,

/// a sender to 'stop' a scheduled resubmission task
pub(crate) resubmission_end_signal: Option<mpsc::Sender<()>>,
}

impl StellarWallet {
Expand Down Expand Up @@ -121,6 +124,7 @@ impl StellarWallet {
max_retry_attempts_before_fallback: Self::DEFAULT_MAX_RETRY_ATTEMPTS_BEFORE_FALLBACK,
max_backoff_delay: Self::DEFAULT_MAX_BACKOFF_DELAY_IN_SECS,
client,
resubmission_end_signal: None,
})
}

Expand Down

0 comments on commit db588a5

Please sign in to comment.