diff --git a/CHANGELOG.md b/CHANGELOG.md index 70c15e2a009..04c0facdc79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ As a minor extension, we have adopted a slightly different versioning convention ## Mithril Distribution [XXXX.X] - UNRELEASED +- `mithril-aggregator` node produces artifact for different signed entity types in parallel. + - **UNSTABLE** Cardano transactions certification: - Make Cardano transaction signing settings configurable via the CD. diff --git a/Cargo.lock b/Cargo.lock index 2ad43adc9b3..21e04511710 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3547,7 +3547,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.5.42" +version = "0.5.43" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 576a9674435..51a9e55e0e6 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.5.42" +version = "0.5.43" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index b8adfce7234..15eb4920ea1 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -1135,6 +1135,7 @@ impl DependenciesBuilder { mithril_stake_distribution_artifact_builder, cardano_immutable_files_full_artifact_builder, cardano_transactions_artifact_builder, + self.get_signed_entity_lock().await?, )); // Compute the cache pool for prover service diff --git a/mithril-aggregator/src/services/signed_entity.rs b/mithril-aggregator/src/services/signed_entity.rs index 98aa46c41d2..73d0e495f23 100644 --- a/mithril-aggregator/src/services/signed_entity.rs +++ b/mithril-aggregator/src/services/signed_entity.rs @@ -2,11 +2,12 @@ //! //! This service is responsible for dealing with [SignedEntity] type. //! It creates [Artifact] that can be accessed by clients. -use anyhow::Context; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use chrono::Utc; use slog_scope::info; use std::sync::Arc; +use tokio::task::JoinHandle; use mithril_common::{ entities::{ @@ -15,6 +16,7 @@ use mithril_common::{ Snapshot, }, signable_builder::Artifact, + signed_entity_type_lock::SignedEntityTypeLock, StdResult, }; @@ -35,7 +37,7 @@ pub trait SignedEntityService: Send + Sync { &self, signed_entity_type: SignedEntityType, certificate: &Certificate, - ) -> StdResult<()>; + ) -> StdResult>>; /// Return a list of signed snapshots order by creation date descending. async fn get_last_signed_snapshots( @@ -69,6 +71,7 @@ pub trait SignedEntityService: Send + Sync { } /// Mithril ArtifactBuilder Service +#[derive(Clone)] pub struct MithrilSignedEntityService { signed_entity_storer: Arc, mithril_stake_distribution_artifact_builder: @@ -77,6 +80,7 @@ pub struct MithrilSignedEntityService { Arc>, cardano_transactions_artifact_builder: Arc>, + signed_entity_type_lock: Arc, } impl MithrilSignedEntityService { @@ -92,15 +96,61 @@ impl MithrilSignedEntityService { cardano_transactions_artifact_builder: Arc< dyn ArtifactBuilder, >, + signed_entity_type_lock: Arc, ) -> Self { Self { signed_entity_storer, mithril_stake_distribution_artifact_builder, cardano_immutable_files_full_artifact_builder, cardano_transactions_artifact_builder, + signed_entity_type_lock, } } + async fn create_artifact_task( + &self, + signed_entity_type: SignedEntityType, + certificate: &Certificate, + ) -> StdResult<()> { + info!( + "MithrilSignedEntityService::create_artifact"; + "signed_entity_type" => ?signed_entity_type, + "certificate_hash" => &certificate.hash + ); + + let mut remaining_retries = 2; + let artifact = loop { + remaining_retries -= 1; + + match self + .compute_artifact(signed_entity_type.clone(), certificate) + .await + { + Err(error) if remaining_retries == 0 => break Err(error), + Err(_error) => (), + Ok(artifact) => break Ok(artifact), + }; + }?; + + let signed_entity = SignedEntityRecord { + signed_entity_id: artifact.get_id(), + signed_entity_type: signed_entity_type.clone(), + certificate_id: certificate.hash.clone(), + artifact: serde_json::to_string(&artifact)?, + created_at: Utc::now(), + }; + + self.signed_entity_storer + .store_signed_entity(&signed_entity) + .await + .with_context(|| { + format!( + "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'" + ) + })?; + Ok(()) + } + /// Compute artifact from signed entity type async fn compute_artifact( &self, @@ -165,44 +215,43 @@ impl SignedEntityService for MithrilSignedEntityService { &self, signed_entity_type: SignedEntityType, certificate: &Certificate, - ) -> StdResult<()> { - info!( - "MithrilSignedEntityService::create_artifact"; - "signed_entity_type" => ?signed_entity_type, - "certificate_hash" => &certificate.hash - ); - - let mut remaining_retries = 2; - let artifact = loop { - remaining_retries -= 1; - - match self - .compute_artifact(signed_entity_type.clone(), certificate) - .await - { - Err(error) if remaining_retries == 0 => break Err(error), - Err(_error) => (), - Ok(artifact) => break Ok(artifact), - }; - }?; - - let signed_entity = SignedEntityRecord { - signed_entity_id: artifact.get_id(), - signed_entity_type: signed_entity_type.clone(), - certificate_id: certificate.hash.clone(), - artifact: serde_json::to_string(&artifact)?, - created_at: Utc::now(), - }; - - self.signed_entity_storer - .store_signed_entity(&signed_entity) + ) -> StdResult>> { + if self + .signed_entity_type_lock + .is_locked(&signed_entity_type) .await - .with_context(|| { - format!( - "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'" - ) - })?; - Ok(()) + { + return Err(anyhow!( + "Signed entity type '{:?}' is already locked", + signed_entity_type + )); + } + + let service = self.clone(); + let certificate_cloned = certificate.clone(); + service + .signed_entity_type_lock + .lock(&signed_entity_type) + .await; + + Ok(tokio::task::spawn(async move { + let signed_entity_type_clone = signed_entity_type.clone(); + let service_clone = service.clone(); + let result = tokio::task::spawn(async move { + service_clone + .create_artifact_task(signed_entity_type_clone, &certificate_cloned) + .await + }) + .await; + service + .signed_entity_type_lock + .release(signed_entity_type.clone()) + .await; + + result.with_context(|| format!( + "Signed Entity Service can not store signed entity with type: '{signed_entity_type}'" + ))? + })) } async fn get_last_signed_snapshots( @@ -299,12 +348,15 @@ impl SignedEntityService for MithrilSignedEntityService { #[cfg(test)] mod tests { + use std::{sync::atomic::Ordering, time::Duration}; + use mithril_common::{ entities::{CardanoTransactionsSnapshot, Epoch}, signable_builder, test_utils::fake_data, }; use serde::{de::DeserializeOwned, Serialize}; + use std::sync::atomic::AtomicBool; use crate::artifact_builder::MockArtifactBuilder; use crate::database::repository::MockSignedEntityStorer; @@ -366,8 +418,92 @@ mod tests { Arc::new(self.mock_mithril_stake_distribution_artifact_builder), Arc::new(self.mock_cardano_immutable_files_full_artifact_builder), Arc::new(self.mock_cardano_transactions_artifact_builder), + Arc::new(SignedEntityTypeLock::default()), + ) + } + + fn build_artifact_builder_service_with_time_consuming_process( + mut self, + atomic_stop: Arc, + ) -> MithrilSignedEntityService { + struct LongArtifactBuilder { + atomic_stop: Arc, + snapshot: Snapshot, + } + + let snapshot = fake_data::snapshots(1).first().unwrap().to_owned(); + + #[async_trait] + impl ArtifactBuilder for LongArtifactBuilder { + async fn compute_artifact( + &self, + _beacon: CardanoDbBeacon, + _certificate: &Certificate, + ) -> StdResult { + let mut max_iteration = 100; + while !self.atomic_stop.load(Ordering::Relaxed) { + max_iteration -= 1; + if max_iteration <= 0 { + return Err(anyhow!("Test should handle the stop")); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + Ok(self.snapshot.clone()) + } + } + let cardano_immutable_files_full_long_artifact_builder = LongArtifactBuilder { + atomic_stop: atomic_stop.clone(), + snapshot: snapshot.clone(), + }; + + let artifact_clone: Arc = Arc::new(snapshot); + let signed_entity_artifact = serde_json::to_string(&artifact_clone).unwrap(); + self.mock_signed_entity_storer + .expect_store_signed_entity() + .withf(move |signed_entity| signed_entity.artifact == signed_entity_artifact) + .return_once(|_| Ok(())); + + MithrilSignedEntityService::new( + Arc::new(self.mock_signed_entity_storer), + Arc::new(self.mock_mithril_stake_distribution_artifact_builder), + Arc::new(cardano_immutable_files_full_long_artifact_builder), + Arc::new(self.mock_cardano_transactions_artifact_builder), + Arc::new(SignedEntityTypeLock::default()), ) } + + fn mock_artifact_processing< + T: Artifact + Clone + Serialize + 'static, + U: signable_builder::Beacon, + >( + &mut self, + artifact: T, + mock_that_provide_artifact: &dyn Fn( + &mut MockDependencyInjector, + ) -> &mut MockArtifactBuilder, + ) { + { + let artifact_cloned = artifact.clone(); + mock_that_provide_artifact(self) + .expect_compute_artifact() + .times(1) + .return_once(|_, _| Ok(artifact_cloned)); + } + { + let artifact_clone: Arc = Arc::new(artifact.clone()); + let artifact_json = serde_json::to_string(&artifact_clone).unwrap(); + self.mock_signed_entity_storer + .expect_store_signed_entity() + .withf(move |signed_entity| signed_entity.artifact == artifact_json) + .return_once(|_| Ok(())); + } + } + + fn mock_stake_distribution_processing(&mut self, artifact: MithrilStakeDistribution) { + self.mock_artifact_processing(artifact, &|mock_injector| { + &mut mock_injector.mock_mithril_stake_distribution_artifact_builder + }); + } } #[tokio::test] @@ -518,8 +654,170 @@ mod tests { let error_message_str = error_message.as_str(); artifact_builder_service - .create_artifact(signed_entity_type, &certificate) + .create_artifact_task(signed_entity_type, &certificate) .await .expect(error_message_str); } + + #[tokio::test] + async fn create_artifact_for_two_signed_entity_types_in_sequence_not_blocking() { + let atomic_stop = Arc::new(AtomicBool::new(false)); + let signed_entity_type_service = { + let mut mock_container = MockDependencyInjector::new(); + + let msd = create_stake_distribution(Epoch(1), 5); + mock_container.mock_stake_distribution_processing(msd); + + mock_container + .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone()) + }; + let certificate = fake_data::certificate("hash".to_string()); + + let signed_entity_type_immutable = + SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()); + let first_task_that_never_finished = signed_entity_type_service + .create_artifact(signed_entity_type_immutable, &certificate) + .await + .unwrap(); + + let signed_entity_type_msd = SignedEntityType::MithrilStakeDistribution(Epoch(1)); + let second_task_that_finish_first = signed_entity_type_service + .create_artifact(signed_entity_type_msd, &certificate) + .await + .unwrap(); + + second_task_that_finish_first.await.unwrap().unwrap(); + assert!(!first_task_that_never_finished.is_finished()); + + atomic_stop.swap(true, Ordering::Relaxed); + } + + #[tokio::test] + async fn create_artifact_lock_unlock_signed_entity_type_while_processing() { + let atomic_stop = Arc::new(AtomicBool::new(false)); + let signed_entity_type_service = MockDependencyInjector::new() + .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone()); + let certificate = fake_data::certificate("hash".to_string()); + + let signed_entity_type_immutable = + SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()); + assert!( + !signed_entity_type_service + .signed_entity_type_lock + .is_locked(&signed_entity_type_immutable) + .await + ); + let join_handle = signed_entity_type_service + .create_artifact(signed_entity_type_immutable.clone(), &certificate) + .await + .unwrap(); + + // Results are stored to finalize the task before assertions, + // ensuring 'atomic_stop' is always assigned a new value. + let is_locked = signed_entity_type_service + .signed_entity_type_lock + .is_locked(&signed_entity_type_immutable) + .await; + let is_finished = join_handle.is_finished(); + + atomic_stop.swap(true, Ordering::Relaxed); + join_handle.await.unwrap().unwrap(); + + assert!(is_locked); + assert!(!is_finished); + + assert!( + !signed_entity_type_service + .signed_entity_type_lock + .is_locked(&signed_entity_type_immutable) + .await + ); + } + + #[tokio::test] + async fn create_artifact_unlock_signed_entity_type_when_error() { + let signed_entity_type_service = { + let mut mock_container = MockDependencyInjector::new(); + mock_container + .mock_cardano_immutable_files_full_artifact_builder + .expect_compute_artifact() + .returning(|_, _| Err(anyhow::anyhow!("Error while computing artifact"))); + + mock_container.build_artifact_builder_service() + }; + let certificate = fake_data::certificate("hash".to_string()); + + let signed_entity_type_immutable = + SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()); + + let join_handle = signed_entity_type_service + .create_artifact(signed_entity_type_immutable.clone(), &certificate) + .await + .unwrap(); + + let error = join_handle.await.unwrap().unwrap_err(); + assert!( + error.to_string().contains("CardanoImmutableFilesFull"), + "Error should contains CardanoImmutableFilesFull but was: {}", + error + ); + + assert!( + !signed_entity_type_service + .signed_entity_type_lock + .is_locked(&signed_entity_type_immutable) + .await + ); + } + + #[tokio::test] + async fn create_artifact_unlock_signed_entity_type_when_panic() { + let signed_entity_type_service = + MockDependencyInjector::new().build_artifact_builder_service(); + let certificate = fake_data::certificate("hash".to_string()); + + let signed_entity_type_immutable = + SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()); + + let join_handle = signed_entity_type_service + .create_artifact(signed_entity_type_immutable.clone(), &certificate) + .await + .unwrap(); + + let error = join_handle.await.unwrap().unwrap_err(); + assert!( + error.to_string().contains("CardanoImmutableFilesFull"), + "Error should contains CardanoImmutableFilesFull but was: {}", + error + ); + + assert!( + !signed_entity_type_service + .signed_entity_type_lock + .is_locked(&signed_entity_type_immutable) + .await + ); + } + + #[tokio::test] + async fn create_artifact_for_a_signed_entity_type_already_lock_return_error() { + let atomic_stop = Arc::new(AtomicBool::new(false)); + let signed_entity_service = MockDependencyInjector::new() + .build_artifact_builder_service_with_time_consuming_process(atomic_stop.clone()); + let certificate = fake_data::certificate("hash".to_string()); + let signed_entity_type_immutable = + SignedEntityType::CardanoImmutableFilesFull(CardanoDbBeacon::default()); + + signed_entity_service + .create_artifact(signed_entity_type_immutable.clone(), &certificate) + .await + .unwrap(); + + signed_entity_service + .create_artifact(signed_entity_type_immutable, &certificate) + .await + .expect_err("Should return error when signed entity type is already locked"); + + atomic_stop.swap(true, Ordering::Relaxed); + } } diff --git a/mithril-aggregator/tests/test_extensions/aggregator_observer.rs b/mithril-aggregator/tests/test_extensions/aggregator_observer.rs index 6c809b9db06..4d95faa1f17 100644 --- a/mithril-aggregator/tests/test_extensions/aggregator_observer.rs +++ b/mithril-aggregator/tests/test_extensions/aggregator_observer.rs @@ -112,4 +112,34 @@ impl AggregatorObserver { .signed_entity_config .time_point_to_signed_entity(discriminant, &time_point)) } + + pub async fn is_last_signed_entity( + &self, + signed_entity_type_expected: &SignedEntityType, + ) -> StdResult { + match signed_entity_type_expected { + SignedEntityType::CardanoImmutableFilesFull(_) => Ok(Some(signed_entity_type_expected) + == self + .signed_entity_service + .get_last_signed_snapshots(1) + .await? + .first() + .map(|s| &s.signed_entity_type)), + SignedEntityType::MithrilStakeDistribution(_) => Ok(Some(signed_entity_type_expected) + == self + .signed_entity_service + .get_last_signed_mithril_stake_distributions(1) + .await? + .first() + .map(|s| &s.signed_entity_type)), + SignedEntityType::CardanoTransactions(_, _) => Ok(Some(signed_entity_type_expected) + == self + .signed_entity_service + .get_last_cardano_transaction_snapshot() + .await? + .map(|s| s.signed_entity_type) + .as_ref()), + _ => Ok(false), + } + } } diff --git a/mithril-aggregator/tests/test_extensions/expected_certificate.rs b/mithril-aggregator/tests/test_extensions/expected_certificate.rs index db37a23ef35..d3f4afab5fb 100644 --- a/mithril-aggregator/tests/test_extensions/expected_certificate.rs +++ b/mithril-aggregator/tests/test_extensions/expected_certificate.rs @@ -50,4 +50,8 @@ impl ExpectedCertificate { pub fn genesis_identifier(beacon: &CardanoDbBeacon) -> String { format!("genesis-{:?}", beacon) } + + pub fn get_signed_type(&self) -> Option { + self.signed_type.clone() + } } diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index 729ca5fe981..eb8b9699368 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -16,7 +16,7 @@ use mithril_common::{ digesters::{DumbImmutableDigester, DumbImmutableFileObserver, ImmutableFileObserver}, entities::{ BlockNumber, Certificate, CertificateSignature, ChainPoint, Epoch, ImmutableFileNumber, - SignedEntityTypeDiscriminants, Snapshot, StakeDistribution, TimePoint, + SignedEntityType, SignedEntityTypeDiscriminants, Snapshot, StakeDistribution, TimePoint, }, era::{adapters::EraReaderDummyAdapter, EraMarker, EraReader, SupportedEra}, test_utils::{ @@ -51,6 +51,13 @@ macro_rules! cycle_err { #[macro_export] macro_rules! assert_last_certificate_eq { ( $tester:expr, $expected_certificate:expr ) => {{ + if let Some(signed_type) = $expected_certificate.get_signed_type() { + $tester + .wait_until_signed_entity(&signed_type) + .await + .unwrap(); + } + let last_certificate = RuntimeTester::get_last_expected_certificate(&mut $tester) .await .unwrap(); @@ -568,4 +575,28 @@ impl RuntimeTester { Ok(cert_identifier) } + + /// Wait until the last stored signed entity of the given type + /// corresponds to the expected signed entity type + pub async fn wait_until_signed_entity( + &self, + signed_entity_type_expected: &SignedEntityType, + ) -> StdResult<()> { + let mut max_iteration = 100; + while !self + .observer + .is_last_signed_entity(signed_entity_type_expected) + .await? + { + max_iteration -= 1; + if max_iteration <= 0 { + return Err(anyhow!( + "Signed entity not found: {signed_entity_type_expected}" + )); + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + + Ok(()) + } }