Skip to content

Commit

Permalink
replay stage feed back program cost (solana-labs#17731)
Browse files Browse the repository at this point in the history
* replay stage feeds back realtime per-program execution cost to cost model;

* program cost execution table is initialized into empty table, no longer populated with hardcoded numbers;

* changed cost unit to microsecond, using value collected from mainnet;

* add ExecuteCostTable with fixed capacity for security concern, when its limit is reached, programs with old age AND less occurrence will be pushed out to make room for new programs.
  • Loading branch information
tao-stones committed Sep 24, 2021
1 parent 3c3addb commit 385a95f
Show file tree
Hide file tree
Showing 11 changed files with 542 additions and 176 deletions.
5 changes: 3 additions & 2 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crossbeam_channel::unbounded;
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::banking_stage::BankingStage;
use solana_core::{banking_stage::BankingStage, cost_model::CostModel};
use solana_gossip::{cluster_info::ClusterInfo, cluster_info::Node};
use solana_ledger::{
blockstore::Blockstore,
Expand All @@ -27,7 +27,7 @@ use solana_sdk::{
};
use solana_streamer::socket::SocketAddrSpace;
use std::{
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex},
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock},
thread::sleep,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -229,6 +229,7 @@ fn main() {
vote_receiver,
None,
replay_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down
9 changes: 4 additions & 5 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use solana_streamer::socket::SocketAddrSpace;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};
use test::Bencher;

Expand Down Expand Up @@ -95,8 +95,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(Mutex::new(CostTracker::new(std::u32::MAX, std::u32::MAX))),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(std::u64::MAX, std::u64::MAX))),
);
});

Expand Down Expand Up @@ -221,8 +221,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
vote_receiver,
None,
s,
std::u32::MAX,
std::u32::MAX,
&Arc::new(RwLock::new(CostModel::new(std::u64::MAX, std::u64::MAX))),
);
poh_recorder.lock().unwrap().set_bank(&bank);

Expand Down
57 changes: 26 additions & 31 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::{
cost_model::{CostModel, ACCOUNT_MAX_COST, BLOCK_MAX_COST},
cost_tracker::CostTracker,
packet_hasher::PacketHasher,
poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry},
poh_service::{self, PohService},
};
use crate::{cost_model::CostModel, cost_tracker::CostTracker, packet_hasher::PacketHasher};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -58,7 +52,7 @@ use std::{
net::UdpSocket,
ops::DerefMut,
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex},
sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
time::Duration,
time::Instant,
Expand Down Expand Up @@ -236,6 +230,7 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
Self::new_with_cost_limit(
cluster_info,
Expand All @@ -244,8 +239,7 @@ impl BankingStage {
verified_vote_receiver,
transaction_status_sender,
gossip_vote_sender,
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
cost_model,
)
}

Expand All @@ -256,15 +250,12 @@ impl BankingStage {
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
account_cost_limit: u32,
block_cost_limit: u32,
cost_model: &Arc<RwLock<CostModel>>,
) -> Self {
// shared immutable 'cost_model' that calcuates transaction costs
// shared mutex guarded 'cost_tracker' tracks bank's cost against configured limits.
let cost_model = Arc::new(CostModel::new(account_cost_limit, block_cost_limit));
let cost_tracker = Arc::new(Mutex::new(CostTracker::new(
cost_model.get_account_cost_limit(),
cost_model.get_block_cost_limit(),
cost_model.read().unwrap().get_account_cost_limit(),
cost_model.read().unwrap().get_block_cost_limit(),
)));
Self::new_num_threads(
cluster_info,
Expand All @@ -287,7 +278,7 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
Expand Down Expand Up @@ -409,7 +400,7 @@ impl BankingStage {
test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) {
let mut rebuffered_packets_len = 0;
Expand Down Expand Up @@ -554,7 +545,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder,
data_budget: &DataBudget,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> BufferedPacketsDecision {
let bank_start;
Expand Down Expand Up @@ -675,7 +666,7 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
data_budget: &DataBudget,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
Expand Down Expand Up @@ -1074,7 +1065,7 @@ impl BankingStage {
transaction_indexes: &[usize],
libsecp256k1_0_5_upgrade_enabled: bool,
votes_only: bool,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> (Vec<HashedTransaction<'static>>, Vec<usize>, Vec<usize>) {
// Making a snapshot of shared cost_tracker by clone(), drop lock immediately.
Expand All @@ -1096,11 +1087,11 @@ impl BankingStage {
tx.verify_precompiles(libsecp256k1_0_5_upgrade_enabled)
.ok()?;

// Get transaction cost via immutable cost_model; try to add cost to
// Get transaction cost via cost_model; try to add cost to
// local copy of cost_tracker, if suceeded, local copy is updated
// and transaction added to valid list; otherwise, transaction is
// added to retry list. No locking here.
let tx_cost = cost_model.calculate_cost(&tx);
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx);
let result = cost_tracker.try_add(tx_cost);
if result.is_err() {
debug!("transaction {:?} would exceed limit: {:?}", tx, result);
Expand Down Expand Up @@ -1173,7 +1164,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> (usize, usize, Vec<usize>) {
let mut packet_conversion_time = Measure::start("packet_conversion");
Expand Down Expand Up @@ -1212,7 +1203,7 @@ impl BankingStage {
// applying cost of processed transactions to shared cost_tracker
transactions.iter().enumerate().for_each(|(index, tx)| {
if !unprocessed_tx_indexes.iter().any(|&i| i == index) {
let tx_cost = cost_model.calculate_cost(&tx.transaction());
let tx_cost = cost_model.read().unwrap().calculate_cost(&tx.transaction());
let mut guard = cost_tracker.lock().unwrap();
let _result = guard.try_add(tx_cost);
drop(guard);
Expand Down Expand Up @@ -1256,7 +1247,7 @@ impl BankingStage {
transaction_indexes: &[usize],
my_pubkey: &Pubkey,
next_leader: Option<Pubkey>,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Vec<usize> {
// Check if we are the next leader. If so, let's not filter the packets
Expand Down Expand Up @@ -1329,7 +1320,7 @@ impl BankingStage {
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
recorder: &TransactionRecorder,
cost_model: &Arc<CostModel>,
cost_model: &Arc<RwLock<CostModel>>,
cost_tracker: &Arc<Mutex<CostTracker>>,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
Expand Down Expand Up @@ -1562,6 +1553,7 @@ fn next_leader_tpu_forwards(
#[cfg(test)]
mod tests {
use super::*;
use crate::cost_model::{ACCOUNT_MAX_COST, BLOCK_MAX_COST};
use crossbeam_channel::unbounded;
use itertools::Itertools;
use solana_gossip::{cluster_info::Node, contact_info::ContactInfo};
Expand Down Expand Up @@ -1631,6 +1623,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
drop(verified_sender);
drop(vote_sender);
Expand Down Expand Up @@ -1676,6 +1669,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);
trace!("sending bank");
drop(verified_sender);
Expand Down Expand Up @@ -1745,6 +1739,7 @@ mod tests {
vote_receiver,
None,
gossip_vote_sender,
&Arc::new(RwLock::new(CostModel::default())),
);

// fund another account so we can send 2 good transactions in a single batch.
Expand Down Expand Up @@ -1892,7 +1887,7 @@ mod tests {
2,
None,
gossip_vote_sender,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand Down Expand Up @@ -2717,7 +2712,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand All @@ -2738,7 +2733,7 @@ mod tests {
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand Down Expand Up @@ -2808,7 +2803,7 @@ mod tests {
test_fn,
&BankingStageStats::default(),
&recorder,
&Arc::new(CostModel::default()),
&Arc::new(RwLock::new(CostModel::default())),
&Arc::new(Mutex::new(CostTracker::new(
ACCOUNT_MAX_COST,
BLOCK_MAX_COST,
Expand Down
Loading

0 comments on commit 385a95f

Please sign in to comment.