Skip to content

Commit

Permalink
Merge pull request exonum#256 from DarkEld3r/dynamic-timeout-adjuster-3
Browse files Browse the repository at this point in the history
Timeout adjusters refactoring
  • Loading branch information
defuz committed Aug 1, 2017
2 parents 3ffd2ea + ee14da1 commit d69f971
Show file tree
Hide file tree
Showing 25 changed files with 481 additions and 120 deletions.
215 changes: 209 additions & 6 deletions exonum/src/blockchain/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ pub struct ConsensusConfig {
pub status_timeout: Milliseconds,
/// Peer exchange timeout.
pub peers_timeout: Milliseconds,
/// Proposal timeout after committing a block.
pub propose_timeout: Milliseconds,
/// Maximum number of transactions per block.
pub txs_block_limit: u32,
/// `TimeoutAdjuster` configuration.
pub timeout_adjuster: TimeoutAdjusterConfig,
}

impl Default for ConsensusConfig {
fn default() -> Self {
ConsensusConfig {
round_timeout: 3000,
propose_timeout: 500,
status_timeout: 5000,
peers_timeout: 10000,
txs_block_limit: 1000,
timeout_adjuster: TimeoutAdjusterConfig::Constant { timeout: 500 },
}
}
}
Expand Down Expand Up @@ -102,6 +102,49 @@ impl StoredConfiguration {
}
}

// Check timeout adjuster.
match config.consensus.timeout_adjuster {
// There is no need to validate `Constant` timeout adjuster.
TimeoutAdjusterConfig::Constant { .. } => (),
TimeoutAdjusterConfig::Dynamic { min, max, .. } => {
if min >= max {
return Err(JsonError::custom(format!(
"Dynamic adjuster: minimal timeout should be less then maximal: \
min = {}, max = {}",
min,
max
)));
}
}
TimeoutAdjusterConfig::MovingAverage {
min,
max,
adjustment_speed,
optimal_block_load,
} => {
if min >= max {
return Err(JsonError::custom(format!(
"Moving average adjuster: minimal timeout must be less then maximal: \
min = {}, max = {}",
min,
max
)));
}
if adjustment_speed <= 0. || adjustment_speed > 1. {
return Err(JsonError::custom(format!(
"Moving average adjuster: adjustment speed must be in the (0..1] range: {}",
adjustment_speed,
)));
}
if optimal_block_load <= 0. || optimal_block_load > 1. {
return Err(JsonError::custom(format!(
"Moving average adjuster: block load must be in the (0..1] range: {}",
adjustment_speed,
)));
}
}
}

Ok(config)
}
}
Expand All @@ -121,9 +164,44 @@ impl StorageValue for StoredConfiguration {
}
}

/// `TimeoutAdjuster` config.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(tag = "type")]
pub enum TimeoutAdjusterConfig {
/// Constant timeout adjuster config.
Constant {
/// Timeout value.
timeout: Milliseconds,
},
/// Dynamic timeout adjuster configuration.
Dynamic {
/// Minimal timeout.
min: Milliseconds,
/// Maximal timeout.
max: Milliseconds,
/// Transactions threshold starting from which the adjuster returns the minimal timeout.
threshold: u32,
},
/// Moving average timeout adjuster configuration.
MovingAverage {
/// Minimal timeout.
min: Milliseconds,
/// Maximal timeout.
max: Milliseconds,
/// Speed of the adjustment.
adjustment_speed: f64,
/// Optimal block load depending on the `txs_block_limit` from the `ConsensusConfig`.
optimal_block_load: f64,
},
}

#[cfg(test)]
mod tests {
use toml;
use serde::{Serialize, Deserialize};

use std::fmt::Debug;

use crypto::{Seed, gen_keypair_from_seed};
use super::*;

Expand All @@ -139,7 +217,7 @@ mod tests {
#[test]
fn stored_configuration_serialize_deserialize() {
let configuration = create_test_configuration();
serialize_deserialize(&configuration);
assert_eq!(configuration, serialize_deserialize(&configuration));
}

#[test]
Expand All @@ -153,6 +231,122 @@ mod tests {
serialize_deserialize(&configuration);
}

#[test]
fn constant_adjuster_config_toml() {
let config = TimeoutAdjusterConfig::Constant { timeout: 500 };
check_toml_roundtrip(&config);
}

#[test]
fn dynamic_adjuster_config_toml() {
let config = TimeoutAdjusterConfig::Dynamic {
min: 1,
max: 1000,
threshold: 10,
};
check_toml_roundtrip(&config);
}

#[test]
fn moving_average_adjuster_config_toml() {
let config = TimeoutAdjusterConfig::MovingAverage {
min: 1,
max: 1000,
adjustment_speed: 0.5,
optimal_block_load: 0.75,
};
check_toml_roundtrip(&config);
}

#[test]
#[should_panic(expected = "Dynamic adjuster: minimal timeout should be less then maximal")]
fn dynamic_adjuster_min_max() {
let mut configuration = create_test_configuration();
configuration.consensus.timeout_adjuster = TimeoutAdjusterConfig::Dynamic {
min: 10,
max: 0,
threshold: 1,
};
serialize_deserialize(&configuration);
}

#[test]
#[should_panic(expected = "Moving average adjuster: minimal timeout must be less then maximal")]
fn moving_average_adjuster_min_max() {
let mut configuration = create_test_configuration();
configuration.consensus.timeout_adjuster = TimeoutAdjusterConfig::MovingAverage {
min: 10,
max: 0,
adjustment_speed: 0.7,
optimal_block_load: 0.5,
};
serialize_deserialize(&configuration);
}

// TODO: Remove `#[rustfmt_skip]` after https://github.com/rust-lang-nursery/rustfmt/issues/1777
// is fixed.
#[cfg_attr(rustfmt, rustfmt_skip)]
#[test]
#[should_panic(expected = "Moving average adjuster: adjustment speed must be in the (0..1]")]
fn moving_average_adjuster_negative_adjustment_speed() {
let mut configuration = create_test_configuration();
configuration.consensus.timeout_adjuster = TimeoutAdjusterConfig::MovingAverage {
min: 1,
max: 20,
adjustment_speed: -0.7,
optimal_block_load: 0.5,
};
serialize_deserialize(&configuration);
}

// TODO: Remove `#[rustfmt_skip]` after https://github.com/rust-lang-nursery/rustfmt/issues/1777
// is fixed.
#[cfg_attr(rustfmt, rustfmt_skip)]
#[test]
#[should_panic(expected = "Moving average adjuster: adjustment speed must be in the (0..1]")]
fn moving_average_adjuster_invalid_adjustment_speed() {
let mut configuration = create_test_configuration();
configuration.consensus.timeout_adjuster = TimeoutAdjusterConfig::MovingAverage {
min: 10,
max: 20,
adjustment_speed: 1.5,
optimal_block_load: 0.5,
};
serialize_deserialize(&configuration);
}

// TODO: Remove `#[rustfmt_skip]` after https://github.com/rust-lang-nursery/rustfmt/issues/1777
// is fixed.
#[cfg_attr(rustfmt, rustfmt_skip)]
#[test]
#[should_panic(expected = "Moving average adjuster: block load must be in the (0..1] range")]
fn moving_average_adjuster_negative_block_load() {
let mut configuration = create_test_configuration();
configuration.consensus.timeout_adjuster = TimeoutAdjusterConfig::MovingAverage {
min: 10,
max: 20,
adjustment_speed: 0.7,
optimal_block_load: -0.5,
};
serialize_deserialize(&configuration);
}

// TODO: Remove `#[rustfmt_skip]` after https://github.com/rust-lang-nursery/rustfmt/issues/1777
// is fixed.
#[cfg_attr(rustfmt, rustfmt_skip)]
#[test]
#[should_panic(expected = "Moving average adjuster: block load must be in the (0..1] range")]
fn moving_average_adjuster_invalid_block_load() {
let mut configuration = create_test_configuration();
configuration.consensus.timeout_adjuster = TimeoutAdjusterConfig::MovingAverage {
min: 10,
max: 20,
adjustment_speed: 0.7,
optimal_block_load: 2.0,
};
serialize_deserialize(&configuration);
}

fn create_test_configuration() -> StoredConfiguration {
let validator_keys = (1..4)
.map(|i| {
Expand All @@ -172,8 +366,17 @@ mod tests {
}
}

fn serialize_deserialize(configuration: &StoredConfiguration) {
fn serialize_deserialize(configuration: &StoredConfiguration) -> StoredConfiguration {
let serialized = configuration.try_serialize().unwrap();
let _ = StoredConfiguration::try_deserialize(&serialized).unwrap();
StoredConfiguration::try_deserialize(&serialized).unwrap()
}

fn check_toml_roundtrip<T>(original: &T)
where
for<'de> T: Serialize + Deserialize<'de> + PartialEq + Debug,
{
let toml = toml::to_string(original).unwrap();
let deserialized: T = toml::from_str(&toml).unwrap();
assert_eq!(*original, deserialized);
}
}
2 changes: 1 addition & 1 deletion exonum/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use storage::{Patch, Database, Snapshot, Fork, Error};
pub use self::block::{Block, BlockProof, SCHEMA_MAJOR_VERSION};
pub use self::schema::{Schema, TxLocation, gen_prefix};
pub use self::genesis::GenesisConfig;
pub use self::config::{ValidatorKeys, StoredConfiguration, ConsensusConfig};
pub use self::config::{ValidatorKeys, StoredConfiguration, ConsensusConfig, TimeoutAdjusterConfig};
pub use self::service::{Service, Transaction, ServiceContext, ApiContext, SharedNodeState};

mod block;
Expand Down
1 change: 0 additions & 1 deletion exonum/src/helpers/fabric/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl NodeBuilder {
T: Into<OsString> + Clone,
{
ClapBackend::execute_cmd_string(self.commands.as_slice(), cmd_line) != Feedback::None

}

/// Parse cmd args, return `Node`, if run command found
Expand Down
3 changes: 1 addition & 2 deletions exonum/src/helpers/fabric/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ impl Command for GenerateNodeConfig {
"expected secret config path",
);


let addr = Self::addr(&context);
let common: CommonConfigTemplate =
ConfigFile::load(&common_config_path).expect("Could not load common config");
Expand Down Expand Up @@ -289,7 +288,7 @@ impl Command for GenerateNodeConfig {
node: node_pub_config,
common: common,
};
// save public config seperately
// Save public config separately.
ConfigFile::save(&shared_config, &pub_config_path).expect(
"Could not write public config file.",
);
Expand Down
7 changes: 2 additions & 5 deletions exonum/src/node/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,8 @@ where
self.broadcast_status();
self.add_status_timeout();

let timeout = self.timeout_adjuster.adjust_timeout(
&self.state,
&*self.blockchain.snapshot(),
);
self.state.set_propose_timeout(timeout);
// Adjust propose timeout after accepting a new block.
self.state.adjust_timeout(&*self.blockchain.snapshot());

// Handle queued transactions from services
for tx in new_txs {
Expand Down
27 changes: 5 additions & 22 deletions exonum/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ use blockchain::{SharedNodeState, Blockchain, Schema, GenesisConfig, Transaction
use messages::{Connect, RawMessage};
use api::{Api, public, private};

use self::timeout_adjuster::TimeoutAdjuster;

pub use self::state::{State, Round, Height, RequestData, ValidatorId, TxPool, ValidatorState};
pub use self::whitelist::Whitelist;

Expand Down Expand Up @@ -100,7 +98,6 @@ where
/// Known peer addresses.
// TODO: move this into peer exchange service
pub peer_discovery: Vec<SocketAddr>,
timeout_adjuster: Box<TimeoutAdjuster>,
}

/// Service configuration.
Expand Down Expand Up @@ -281,13 +278,11 @@ where
sender.get_time(),
);

let mut timeout_adjuster = Box::new(timeout_adjuster::Constant::default());
let timeout = timeout_adjuster.adjust_timeout(&state, &*snapshot);
state.set_propose_timeout(timeout);
// Adjust propose timeout for the first time.
state.adjust_timeout(&*snapshot);

NodeHandler {
blockchain,
timeout_adjuster,
api_state,
state,
channel: sender,
Expand All @@ -300,17 +295,6 @@ where
&self.api_state
}


/// Sets new timeout adjuster.
pub fn set_timeout_adjuster(&mut self, adjuster: Box<timeout_adjuster::TimeoutAdjuster>) {
self.timeout_adjuster = adjuster;
}

/// Returns value of the `propose_timeout` field from the current `ConsensusConfig`.
pub fn propose_timeout(&self) -> Milliseconds {
self.state().consensus_config().propose_timeout
}

/// Returns value of the `round_timeout` field from the current `ConsensusConfig`.
pub fn round_timeout(&self) -> Milliseconds {
self.state().consensus_config().round_timeout
Expand Down Expand Up @@ -417,9 +401,9 @@ where

/// Adds `NodeTimeout::Propose` timeout to the channel.
pub fn add_propose_timeout(&mut self) {
let adjusted_propose_timeout = self.state.propose_timeout();
let adjusted_timeout = self.state.propose_timeout();
let time = self.round_start_time(self.state.round()) +
Duration::from_millis(adjusted_propose_timeout);
Duration::from_millis(adjusted_timeout);

trace!(
"ADD PROPOSE TIMEOUT: time={:?}, height={}, round={}",
Expand Down Expand Up @@ -529,8 +513,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"NodeHandler {{ channel: Channel {{ .. }}, blockchain: {:?}, \
peer_discovery: {:?}, timeout_adjuster: Box<TimeoutAdjuster> }}",
"NodeHandler {{ channel: Channel {{ .. }}, blockchain: {:?}, peer_discovery: {:?} }}",
self.blockchain,
self.peer_discovery
)
Expand Down
Loading

0 comments on commit d69f971

Please sign in to comment.