Skip to content

Commit

Permalink
test narwhal manager
Browse files Browse the repository at this point in the history
  • Loading branch information
lavindir committed Dec 21, 2022
1 parent 868fccf commit 9b024c3
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 108 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ narwhal-executor = { path = "../../narwhal/executor" }
narwhal-node = { path = "../../narwhal/node" }
narwhal-types = { path = "../../narwhal/types" }
narwhal-worker = { path = "../../narwhal/worker" }
narwhal-test-utils = { path = "../../narwhal/test-utils" }
telemetry-subscribers.workspace = true
typed-store.workspace = true
typed-store-derive.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/narwhal_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#[cfg(test)]
#[path = "../unit_tests/narwhal_manager_tests.rs"]
pub mod narwhal_manager_tests;

use arc_swap::ArcSwap;
use fastcrypto::bls12381;
use fastcrypto::traits::KeyPair;
Expand Down
134 changes: 134 additions & 0 deletions crates/sui-core/src/unit_tests/narwhal_manager_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::AuthorityState;
use crate::authority::AuthorityStore;
use crate::epoch::committee_store::CommitteeStore;
use crate::narwhal_manager::{
run_narwhal_manager, NarwhalConfiguration, NarwhalManager, NarwhalStartMessage,
};
use mysten_metrics::RegistryService;
use narwhal_config::SharedWorkerCache;
use narwhal_executor::ExecutionState;
use narwhal_types::ConsensusOutput;
use narwhal_worker::TrivialTransactionValidator;
use prometheus::Registry;
use std::sync::Arc;
use std::time::Duration;
use sui_types::crypto::KeypairTraits;
use test_utils::authority::test_and_configure_authority_configs;
use tokio::sync::mpsc::channel;

#[derive(Clone)]
struct NoOpExecutionState {}

#[async_trait::async_trait]
impl ExecutionState for NoOpExecutionState {
async fn handle_consensus_output(&self, _consensus_output: ConsensusOutput) {}

async fn last_executed_sub_dag_index(&self) -> u64 {
0
}
}

#[tokio::test]
async fn test_narwhal_manager() {
let configs = test_and_configure_authority_configs(1);
let registry_service = RegistryService::new(Registry::new());

let config = configs.validator_configs()[0].clone();
let consensus_config = config.consensus_config().unwrap();

let secret = Arc::pin(config.protocol_key_pair().copy());
let genesis = config.genesis().unwrap();
let genesis_committee = genesis.committee().unwrap();
let committee_store = Arc::new(CommitteeStore::new(
config.db_path().join("epochs"),
&genesis_committee,
None,
));

let store = Arc::new(
AuthorityStore::open(
&config.db_path().join("store"),
None,
genesis,
&committee_store,
)
.await
.unwrap(),
);

let state = AuthorityState::new(
config.protocol_public_key(),
secret,
store,
committee_store.clone(),
None,
None,
None,
&registry_service.default_registry(),
)
.await;

let system_state = state
.get_sui_system_state_object()
.expect("Reading Sui system state object cannot fail");

let transactions_addr = &config.consensus_config.as_ref().unwrap().address;
let mut narwhal_committee = system_state.get_current_epoch_narwhal_committee();
let mut worker_cache = system_state.get_current_epoch_narwhal_worker_cache(transactions_addr);

let execution_state = Arc::new(NoOpExecutionState {});

let narwhal_config = NarwhalConfiguration {
primary_keypair: config.protocol_key_pair().copy(),
network_keypair: config.network_key_pair.copy(),
worker_ids_and_keypairs: vec![(0, config.worker_key_pair().copy())],
storage_base_path: consensus_config.db_path().to_path_buf(),
parameters: consensus_config.narwhal_config().to_owned(),
tx_validator: TrivialTransactionValidator::default(),
registry_service,
};

let (tx_start, tr_start) = channel(1);
let (tx_stop, tr_stop) = channel(1);
let join_handle = tokio::spawn(run_narwhal_manager(narwhal_config, tr_start, tr_stop));

let narwhal_manager = NarwhalManager {
join_handle,
tx_start,
tx_stop,
};

// start narwhal
assert!(narwhal_manager
.tx_start
.send(NarwhalStartMessage {
committee: Arc::new(narwhal_committee.clone()),
shared_worker_cache: SharedWorkerCache::from(worker_cache.clone()),
execution_state: Arc::new(execution_state.clone())
})
.await
.is_ok());

tokio::time::sleep(Duration::from_millis(500)).await;

// stop narwhal
assert!(narwhal_manager.tx_stop.send(()).await.is_ok());

// advance epoch
narwhal_committee.epoch = 1;
worker_cache.epoch = 1;

// start narwhal with advanced epoch
assert!(narwhal_manager
.tx_start
.send(NarwhalStartMessage {
committee: Arc::new(narwhal_committee.clone()),
shared_worker_cache: SharedWorkerCache::from(worker_cache.clone()),
execution_state: Arc::new(execution_state)
})
.await
.is_ok());
}
1 change: 1 addition & 0 deletions crates/sui-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sui-types = { path = "../sui-types" }
mysten-metrics = { path = "../mysten-metrics" }
narwhal-network = { path = "../../narwhal/network" }
narwhal-types = { path = "../../narwhal/types" }
narwhal-config = { path = "../../narwhal/config" }
prometheus-closure-metric = { path = "../prometheus-closure-metric" }
typed-store.workspace = true
mysten-network.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod admin;
mod handle;
pub mod metrics;
pub use handle::SuiNodeHandle;
use narwhal_config::SharedWorkerCache;
use narwhal_types::TransactionsClient;
use sui_core::checkpoints::{
CheckpointMetrics, CheckpointService, CheckpointStore, SendCheckpointToStateSync,
Expand Down Expand Up @@ -518,7 +519,7 @@ impl SuiNode {

let msg = NarwhalStartMessage {
committee: committee.clone(),
shared_worker_cache: worker_cache,
shared_worker_cache: SharedWorkerCache::from(worker_cache),
execution_state: consensus_handler,
};
narwhal_manager.tx_start.send(msg).await?;
Expand Down Expand Up @@ -658,7 +659,7 @@ impl SuiNode {
.tx_start
.send(NarwhalStartMessage {
committee: Arc::new(narwhal_committee),
shared_worker_cache: worker_cache,
shared_worker_cache: SharedWorkerCache::from(worker_cache),
execution_state: consensus_handler,
})
.await?;
Expand Down
5 changes: 2 additions & 3 deletions crates/sui-types/src/sui_system_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
use fastcrypto::traits::ToFromBytes;
use move_core_types::{ident_str, identifier::IdentStr, language_storage::StructTag};
use multiaddr::Multiaddr;
use narwhal_config::{Committee as NarwhalCommittee, SharedWorkerCache, WorkerCache, WorkerIndex};
use narwhal_config::{Committee as NarwhalCommittee, WorkerCache, WorkerIndex};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -240,7 +240,7 @@ impl SuiSystemState {
pub fn get_current_epoch_narwhal_worker_cache(
&self,
transactions_address: &Multiaddr,
) -> SharedWorkerCache {
) -> WorkerCache {
let workers: BTreeMap<narwhal_crypto::PublicKey, WorkerIndex> = self
.validators
.active_validators
Expand Down Expand Up @@ -270,6 +270,5 @@ impl SuiSystemState {
workers,
epoch: self.epoch,
}
.into()
}
}
103 changes: 0 additions & 103 deletions narwhal/node/tests/reconfigure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,109 +179,6 @@ async fn run_client(
}
}

// #[ignore]
// #[tokio::test]
// async fn restart() {
// telemetry_subscribers::init_for_testing();
// let fixture = CommitteeFixture::builder().randomize_ports(true).build();
// let committee = fixture.committee();
// let worker_cache = fixture.shared_worker_cache();
//
// // Spawn the nodes.
// let mut rx_nodes = Vec::new();
// for a in fixture.authorities() {
// let (tx_output, rx_output) = channel(10);
// let (tx_node_reconfigure, rx_node_reconfigure) = channel(10);
//
// let execution_state = Arc::new(SimpleExecutionState::new(
// a.keypair().copy(),
// a.network_keypair().copy(),
// a.worker_keypairs(),
// fixture.worker_cache(),
// committee.clone(),
// tx_output,
// tx_node_reconfigure,
// ));
//
// let worker_keypairs = a.worker_keypairs();
// let worker_ids = 0..worker_keypairs.len() as u32;
// let worker_ids_and_keypairs = worker_ids.zip(worker_keypairs.into_iter()).collect();
//
// let committee = committee.clone();
// let worker_cache = worker_cache.clone();
//
// let parameters = Parameters {
// batch_size: 200,
// max_header_num_of_batches: 1,
// ..Parameters::default()
// };
//
// let keypair = a.keypair().copy();
// let network_keypair = a.network_keypair().copy();
// tokio::spawn(async move {
// NodeRestarter::watch(
// keypair,
// network_keypair,
// worker_ids_and_keypairs,
// &committee,
// worker_cache,
// /* base_store_path */ test_utils::temp_dir(),
// execution_state,
// parameters,
// TrivialTransactionValidator::default(),
// rx_node_reconfigure,
// &Registry::new(),
// )
// .await;
// });
//
// rx_nodes.push(rx_output);
// }
//
// // Give a chance to the nodes to start.
// tokio::task::yield_now().await;
//
// // Spawn some clients.
// let mut tx_clients = Vec::new();
// for a in fixture.authorities() {
// let (tx_client_reconfigure, rx_client_reconfigure) = channel(10);
// tx_clients.push(tx_client_reconfigure);
//
// let name = a.public_key();
// let worker_cache = worker_cache.clone();
// tokio::spawn(
// async move { run_client(name, worker_cache.clone(), rx_client_reconfigure).await },
// );
// }
//
// // Listen to the outputs.
// let mut handles = Vec::new();
// for (tx, mut rx) in tx_clients.into_iter().zip(rx_nodes.into_iter()) {
// handles.push(tokio::spawn(async move {
// let mut current_epoch = 0u64;
//
// while let Some(epoch) = rx.recv().await {
// println!("Received epoch {}", epoch);
// if epoch == 5 {
// return;
// }
// if epoch > current_epoch {
// current_epoch = epoch;
// tx.send(current_epoch).await.unwrap();
// }
// }
//
// if current_epoch < 5 {
// panic!("Node never reached epoch 5, something broke our connection");
// }
// }));
// }
//
// try_join_all(handles)
// .await
// .expect("No error should occurred");
// }

#[ignore]
#[tokio::test]
async fn epoch_change() {
Expand Down

0 comments on commit 9b024c3

Please sign in to comment.